From ac2742534184eb64e3b28ccece50b7cab5a8050a Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Tue, 9 Aug 2022 17:33:13 -0400 Subject: [PATCH] kv: ensure secondary tenants route follower reads to the closest replica The dist sender uses node locality information to rank replicas of a range by latency. Previously, this node locality information was read off a node descriptor available in Gossip. Unfortunately, secondary tenants do not have access to Gossip, and as such, would end up randomizing this list of replicas. This manifested itself through unpredictable latencies when running follower reads. We're no longer susceptible to this hazard with this patch. This is done by eschewing the need of a node descriptor from gossip in the DistSender; instead, we now instantiate the DistSender with locality information. However, we do still use Gossip to get the current node's ID when ranking replicas. This is done to ascertain if there is a local replica, and if there is, to always route to it. Unfortunately, because secondary tenants don't have access to Gossip, they can't conform to these semantics. They're susceptible to a hazard where a request may be routed to another replica in the same locality tier as the client even though the client has a local replica as well. This shouldn't be a concern in practice given the diversity heuristic. Resolves #81000 Release note (bug fix): fix an issue where secondary tenants could route follower reads to a random, far away replica instead of one closer. --- pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel | 2 + .../kvfollowerreadsccl/followerreads_test.go | 207 ++++++++++++++++++ pkg/kv/kvclient/kvcoord/dist_sender.go | 16 +- .../kvclient/kvcoord/dist_sender_rangefeed.go | 2 +- pkg/kv/kvclient/kvcoord/replica_slice.go | 24 +- pkg/kv/kvclient/kvcoord/replica_slice_test.go | 22 +- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/replica_follower_read.go | 3 +- pkg/server/server.go | 1 + pkg/server/tenant.go | 3 +- pkg/server/testserver.go | 5 + pkg/sql/physicalplan/replicaoracle/oracle.go | 4 +- pkg/testutils/serverutils/test_tenant_shim.go | 3 + 13 files changed, 266 insertions(+), 27 deletions(-) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel index fd72ef6ebeab..156bb45eb82b 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel @@ -45,6 +45,7 @@ go_test( embed = [":kvfollowerreadsccl"], deps = [ "//pkg/base", + "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/utilccl", "//pkg/keys", "//pkg/kv", @@ -57,6 +58,7 @@ go_test( "//pkg/rpc", "//pkg/security/securityassets", "//pkg/security/securitytest", + "//pkg/security/username", "//pkg/server", "//pkg/settings/cluster", "//pkg/sql", diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index d208aada388d..3c9088528f08 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -10,11 +10,16 @@ package kvfollowerreadsccl import ( "context" + gosql "database/sql" + "fmt" "math" + "net/url" "testing" "time" "github.com/cockroachdb/cockroach/pkg/base" + // Blank import kvtenantccl so that we can create a tenant. + _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -24,10 +29,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -690,3 +697,203 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { require.NoError(t, err) require.Greater(t, followerReadsCountAfter, followerReadsCountBefore) } + +// TestSecondaryTenantFollowerReadsRouting ensures that secondary tenants route +// their requests to the nearest replica. The test runs two versions -- one +// where accurate latency information between nodes is available and another +// where it needs to be estimated using node localities. +func TestSecondaryTenantFollowerReadsRouting(t *testing.T) { + defer utilccl.TestingEnableEnterprise()() + defer leaktest.AfterTest(t)() + defer log.TestingSetRedactable(false)() + + testutils.RunTrueAndFalse(t, "valid-latency-func", func(t *testing.T, validLatencyFunc bool) { + const numNodes = 4 + + serverArgs := make(map[int]base.TestServerArgs) + localities := make(map[int]roachpb.Locality) + for i := 0; i < numNodes; i++ { + regionName := fmt.Sprintf("region_%d", i) + if i == 3 { + // Make it such that n4 and n2 are in the same region. Below, we'll + // expect a follower read from n4 to be served by n2 because they're + // in the same locality (when validLatencyFunc is false). + regionName = fmt.Sprintf("region_%d", 1) + } + locality := roachpb.Locality{ + Tiers: []roachpb.Tier{{Key: "region", Value: regionName}}, + } + localities[i] = locality + serverArgs[i] = base.TestServerArgs{ + Locality: localities[i], + DisableDefaultTestTenant: true, // we'll create one ourselves below. + } + } + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgs, + }) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + + historicalQuery := `SELECT * FROM t.test AS OF SYSTEM TIME follower_read_timestamp() WHERE k=2` + recCh := make(chan tracingpb.Recording, 1) + + var tenants [numNodes]serverutils.TestTenantInterface + for i := 0; i < numNodes; i++ { + knobs := base.TestingKnobs{} + if i == 3 { // n4 + knobs = base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + DontConsiderConnHealth: true, + // For the validLatencyFunc=true version of the test, the client + // pretends to have a low latency connection to n2. As a result, we + // expect n2 to be used for follower reads originating from n4. + // + // For the variant where no latency information is available, we + // expect n2 to serve follower reads as well, but because it + // is in the same locality as the client. + LatencyFunc: func(addr string) (time.Duration, bool) { + if !validLatencyFunc { + return 0, false + } + if addr == tc.Server(1).RPCAddr() { + return time.Millisecond, true + } + return 100 * time.Millisecond, true + }, + }, + SQLExecutor: &sql.ExecutorTestingKnobs{ + WithStatementTrace: func(trace tracingpb.Recording, stmt string) { + if stmt == historicalQuery { + recCh <- trace + } + }, + }, + } + } + tt, err := tc.Server(i).StartTenant(ctx, base.TestTenantArgs{ + TenantID: serverutils.TestTenantID(), + Locality: localities[i], + TestingKnobs: knobs, + }) + require.NoError(t, err) + tenants[i] = tt + } + + // Speed up closing of timestamps in order to sleep less below before we can + // use follower_read_timestamp(). Note that we need to override the setting + // for the tenant as well, because the builtin is run in the tenant's sql pod. + systemSQL := sqlutils.MakeSQLRunner(tc.Conns[0]) + systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`) + systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`) + systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`) + systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`) + systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`) + systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`) + // We're making assertions on traces collected by the tenant using log lines + // in KV so we must ensure they're not redacted. + systemSQL.Exec(t, `SET CLUSTER SETTING server.secondary_tenants.redact_trace = 'false'`) + + // Wait until all tenant servers are aware of the setting override. + testutils.SucceedsSoon(t, func() error { + settingNames := []string{ + "kv.closed_timestamp.target_duration", "kv.closed_timestamp.side_transport_interval", "kv.closed_timestamp.propagation_slack", + } + for _, settingName := range settingNames { + for i := 0; i < numNodes; i++ { + pgURL, cleanup := sqlutils.PGUrl(t, tenants[i].SQLAddr(), "Tenant", url.User(username.RootUser)) + defer cleanup() + db, err := gosql.Open("postgres", pgURL.String()) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + var val string + err = db.QueryRow( + fmt.Sprintf("SHOW CLUSTER SETTING %s", settingName), + ).Scan(&val) + require.NoError(t, err) + if val != "00:00:00.1" { + return errors.Errorf("tenant server %d is still waiting for %s update: currently %s", + i, + settingName, + val, + ) + } + } + } + return nil + }) + + pgURL, cleanupPGUrl := sqlutils.PGUrl( + t, tenants[3].SQLAddr(), "Tenant", url.User(username.RootUser), + ) + defer cleanupPGUrl() + tenantSQLDB, err := gosql.Open("postgres", pgURL.String()) + require.NoError(t, err) + defer tenantSQLDB.Close() + tenantSQL := sqlutils.MakeSQLRunner(tenantSQLDB) + + tenantSQL.Exec(t, `CREATE DATABASE t`) + tenantSQL.Exec(t, `CREATE TABLE t.test (k INT PRIMARY KEY)`) + + startKey := keys.MakeSQLCodec(serverutils.TestTenantID()).TenantPrefix() + tc.AddVotersOrFatal(t, startKey, tc.Target(1), tc.Target(2)) + desc := tc.LookupRangeOrFatal(t, startKey) + require.Equal(t, []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, desc.Replicas().Descriptors()) + + // Sleep so that we can perform follower reads. The read timestamp needs to be + // above the timestamp when the table was created. + log.Infof(ctx, "test sleeping for the follower read timestamps to pass the table creation timestamp...") + time.Sleep(300 * time.Millisecond) + log.Infof(ctx, "test sleeping... done") + + getFollowerReadCounts := func() [numNodes]int64 { + var counts [numNodes]int64 + for i := range tc.Servers { + err := tc.Servers[i].Stores().VisitStores(func(s *kvserver.Store) error { + counts[i] = s.Metrics().FollowerReadsCount.Count() + return nil + }) + require.NoError(t, err) + } + return counts + } + + // Check that the cache was indeed populated. + tenantSQL.Exec(t, `SELECT * FROM t.test WHERE k = 1`) + tablePrefix := keys.MustAddr(keys.MakeSQLCodec(serverutils.TestTenantID()).TenantPrefix()) + cache := tenants[3].DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache() + entry := cache.GetCached(ctx, tablePrefix, false /* inverted */) + require.NotNil(t, entry) + require.False(t, entry.Lease().Empty()) + require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID) + require.Equal(t, []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, entry.Desc().Replicas().Descriptors()) + + followerReadCountsBefore := getFollowerReadCounts() + tenantSQL.Exec(t, historicalQuery) + followerReadsCountsAfter := getFollowerReadCounts() + + rec := <-recCh + // Look at the trace and check that we've served a follower read. + require.True(t, kv.OnlyFollowerReads(rec), "query was served through follower reads: %s", rec) + + for i := 0; i < numNodes; i++ { + if i == 1 { // n2 + require.Greater(t, followerReadsCountsAfter[i], followerReadCountsBefore[i]) + continue + } + require.Equal(t, followerReadsCountsAfter[i], followerReadCountsBefore[i]) + } + }) +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index bd2dbd7efbd2..921afe0844f9 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -336,6 +336,11 @@ type DistSender struct { // LatencyFunc is used to estimate the latency to other nodes. latencyFunc LatencyFunc + // locality is the description of the topography of the server on which the + // DistSender is running. It is used to estimate the latency to other nodes + // in the absence of a latency function. + locality roachpb.Locality + // If set, the DistSender will try the replicas in the order they appear in // the descriptor, instead of trying to reorder them by latency. The knob // only applies to requests sent with the LEASEHOLDER routing policy. @@ -386,6 +391,10 @@ type DistSenderConfig struct { FirstRangeProvider FirstRangeProvider RangeDescriptorDB rangecache.RangeDescriptorDB + // Locality is the description of the topography of the server on which the + // DistSender is running. + Locality roachpb.Locality + // KVInterceptor is set for tenants; when set, information about all // BatchRequests and BatchResponses are passed through this interceptor, which // can potentially throttle requests. @@ -405,6 +414,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { nodeDescs: cfg.NodeDescs, metrics: makeDistSenderMetrics(), kvInterceptor: cfg.KVInterceptor, + locality: cfg.Locality, } if ds.st == nil { ds.st = cluster.MakeTestingClusterSettings() @@ -545,7 +555,7 @@ func (ds *DistSender) FirstRange() (*roachpb.RangeDescriptor, error) { // getNodeID attempts to return the local node ID. It returns 0 if the DistSender // does not have access to the Gossip network. func (ds *DistSender) getNodeID() roachpb.NodeID { - // TODO(nvanbenschoten): open an issue about the effect of this. + // TODO(arul): Open a new issue about the new effect of this. g, ok := ds.nodeDescs.(*gossip.Gossip) if !ok { return 0 @@ -1959,7 +1969,7 @@ func (ds *DistSender) sendToReplicas( // First order by latency, then move the leaseholder to the front of the // list, if it is known. if !ds.dontReorderReplicas { - replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc) + replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality) } idx := -1 @@ -1978,7 +1988,7 @@ func (ds *DistSender) sendToReplicas( case roachpb.RoutingPolicy_NEAREST: // Order by latency. log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required") - replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc) + replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality) default: log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 5106f5e209aa..3f09a3334424 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -431,7 +431,7 @@ func (ds *DistSender) singleRangeFeed( if err != nil { return args.Timestamp, err } - replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn) + replicas.OptimizeReplicaOrder(ds.getNodeID(), latencyFn, ds.locality) // The RangeFeed is not used for system critical traffic so use a DefaultClass // connection regardless of the range. opts := SendOptions{class: connectionClass(&ds.st.SV)} diff --git a/pkg/kv/kvclient/kvcoord/replica_slice.go b/pkg/kv/kvclient/kvcoord/replica_slice.go index fc3e010ffdb1..2213babcb132 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice.go @@ -188,21 +188,23 @@ type LatencyFunc func(string) (time.Duration, bool) // they're to be used for sending RPCs (meaning in the order in which // they'll be probed for the lease). Lower latency and "closer" // (matching in more attributes) replicas are ordered first. If the -// current node is a replica, then it'll be the first one. +// current node has a replica (and the current node's ID is supplied) +// then it'll be the first one. // -// nodeDesc is the descriptor of the current node. It can be nil, in -// which case information about the current descriptor is not used in -// optimizing the order. +// nodeID is the ID of the current node the current node. It can be 0, in which +// case information about the current node is not used in optimizing the order. +// Similarly, latencyFn can be nil, in which case it will not be used. // // Note that this method is not concerned with any information the // node might have about who the lease holder might be. If the // leaseholder is known by the caller, the caller will move it to the // front if appropriate. func (rs ReplicaSlice) OptimizeReplicaOrder( - nodeDesc *roachpb.NodeDescriptor, latencyFn LatencyFunc, + nodeID roachpb.NodeID, latencyFn LatencyFunc, locality roachpb.Locality, ) { - // If we don't know which node we're on, send the RPCs randomly. - if nodeDesc == nil { + // If we don't know which node we're on or its locality, and we don't have + // latency information to other nodes, send the RPCs randomly. + if nodeID == 0 && latencyFn == nil && len(locality.Tiers) == 0 { shuffle.Shuffle(rs) return } @@ -214,10 +216,10 @@ func (rs ReplicaSlice) OptimizeReplicaOrder( return false // i == j } // Replicas on the local node sort first. - if rs[i].NodeID == nodeDesc.NodeID { + if rs[i].NodeID == nodeID { return true // i < j } - if rs[j].NodeID == nodeDesc.NodeID { + if rs[j].NodeID == nodeID { return false // j < i } @@ -228,8 +230,8 @@ func (rs ReplicaSlice) OptimizeReplicaOrder( return latencyI < latencyJ } } - attrMatchI := localityMatch(nodeDesc.Locality.Tiers, rs[i].locality()) - attrMatchJ := localityMatch(nodeDesc.Locality.Tiers, rs[j].locality()) + attrMatchI := localityMatch(locality.Tiers, rs[i].locality()) + attrMatchJ := localityMatch(locality.Tiers, rs[j].locality()) // Longer locality matches sort first (the assumption is that // they'll have better latencies). return attrMatchI > attrMatchJ diff --git a/pkg/kv/kvclient/kvcoord/replica_slice_test.go b/pkg/kv/kvclient/kvcoord/replica_slice_test.go index c234edc69cc5..fc385b245a1d 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice_test.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice_test.go @@ -175,7 +175,10 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { defer log.Scope(t).Close(t) testCases := []struct { name string - node *roachpb.NodeDescriptor + // nodeID of the DistSender. + nodeID roachpb.NodeID + // locality of the DistSender. + locality roachpb.Locality // map from node address (see nodeDesc()) to latency to that node. latencies map[string]time.Duration slice ReplicaSlice @@ -186,8 +189,9 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { expOrdered []roachpb.NodeID }{ { - name: "order by locality matching", - node: nodeDesc(t, 1, []string{"country=us", "region=west", "city=la"}), + name: "order by locality matching", + nodeID: 1, + locality: locality(t, []string{"country=us", "region=west", "city=la"}), slice: ReplicaSlice{ info(t, 1, 1, []string{"country=us", "region=west", "city=la"}), info(t, 2, 2, []string{"country=us", "region=west", "city=sf"}), @@ -198,8 +202,9 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { expOrdered: []roachpb.NodeID{1, 2, 4, 3}, }, { - name: "order by latency", - node: nodeDesc(t, 1, []string{"country=us", "region=west", "city=la"}), + name: "order by latency", + nodeID: 1, + locality: locality(t, []string{"country=us", "region=west", "city=la"}), latencies: map[string]time.Duration{ "2:26257": time.Hour, "3:26257": time.Minute, @@ -217,8 +222,9 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { // Test that replicas on the local node sort first, regardless of factors // like their latency measurement (in production they won't have any // latency measurement). - name: "local node comes first", - node: nodeDesc(t, 1, nil), + name: "local node comes first", + nodeID: 1, + locality: locality(t, nil), latencies: map[string]time.Duration{ "1:26257": 10 * time.Hour, "2:26257": time.Hour, @@ -246,7 +252,7 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { } // Randomize the input order, as it's not supposed to matter. shuffle.Shuffle(test.slice) - test.slice.OptimizeReplicaOrder(test.node, latencyFn) + test.slice.OptimizeReplicaOrder(test.nodeID, latencyFn, test.locality) var sortedNodes []roachpb.NodeID sortedNodes = append(sortedNodes, test.slice[0].NodeID) for i := 1; i < len(test.slice); i++ { diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index e44dfc348f6a..1b2367fe6c6d 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -202,6 +202,7 @@ go_library( "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//oserror", + "@com_github_cockroachdb_errors//safedetails", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index b5a952c7bd18..6174c23566ab 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors/safedetails" ) // FollowerReadsEnabled controls whether replicas attempt to serve follower @@ -101,7 +102,7 @@ func (r *Replica) canServeFollowerReadRLocked(ctx context.Context, ba *roachpb.B // // TODO(tschottdorf): once a read for a timestamp T has been served, the replica may // serve reads for that and smaller timestamps forever. - log.Eventf(ctx, "%s; query timestamp below closed timestamp by %s", kvbase.FollowerReadServingMsg, -tsDiff) + log.Eventf(ctx, "%s; query timestamp below closed timestamp by %s", safedetails.Safe(kvbase.FollowerReadServingMsg), -tsDiff) r.store.metrics.FollowerReadsCount.Inc(1) return true } diff --git a/pkg/server/server.go b/pkg/server/server.go index 3151b5496bbf..32d3114ddf22 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -365,6 +365,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { NodeDialer: nodeDialer, FirstRangeProvider: g, TestingKnobs: clientTestingKnobs, + Locality: cfg.Locality, } distSender := kvcoord.NewDistSender(distSenderCfg) registry.AddMetricStruct(distSender.Metrics()) diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 896ac56b7a17..5c6492ad2a68 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -400,7 +400,7 @@ func makeTenantSQLServerArgs( }) var dsKnobs kvcoord.ClientTestingKnobs - if dsKnobsP, ok := baseCfg.TestingKnobs.DistSQL.(*kvcoord.ClientTestingKnobs); ok { + if dsKnobsP, ok := baseCfg.TestingKnobs.KVClient.(*kvcoord.ClientTestingKnobs); ok { dsKnobs = *dsKnobsP } rpcRetryOptions := base.DefaultRetryOptions() @@ -440,6 +440,7 @@ func makeTenantSQLServerArgs( RangeDescriptorDB: tenantConnect, KVInterceptor: costController, TestingKnobs: dsKnobs, + Locality: baseCfg.Locality, } ds := kvcoord.NewDistSender(dsCfg) diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index acfefbb3bdf3..3b4b39958c30 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -663,6 +663,11 @@ func (t *TestTenant) DistSQLServer() interface{} { return t.SQLServer.distSQLServer } +// DistSenderI is part of the TestTenantInterface. +func (t *TestTenant) DistSenderI() interface{} { + return t.SQLServer.execCfg.DistSender +} + // RPCContext is part of TestTenantInterface. func (t *TestTenant) RPCContext() *rpc.Context { return t.execCfg.RPCContext diff --git a/pkg/sql/physicalplan/replicaoracle/oracle.go b/pkg/sql/physicalplan/replicaoracle/oracle.go index 0523ce1402ec..b2817c87c778 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle.go @@ -175,7 +175,7 @@ func (o *closestOracle) ChoosePreferredReplica( if err != nil { return roachpb.ReplicaDescriptor{}, err } - replicas.OptimizeReplicaOrder(&o.nodeDesc, o.latencyFunc) + replicas.OptimizeReplicaOrder(o.nodeDesc.NodeID, o.latencyFunc, o.nodeDesc.Locality) return replicas[0].ReplicaDescriptor, nil } @@ -229,7 +229,7 @@ func (o *binPackingOracle) ChoosePreferredReplica( if err != nil { return roachpb.ReplicaDescriptor{}, err } - replicas.OptimizeReplicaOrder(&o.nodeDesc, o.latencyFunc) + replicas.OptimizeReplicaOrder(o.nodeDesc.NodeID, o.latencyFunc, o.nodeDesc.Locality) // Look for a replica that has been assigned some ranges, but it's not yet full. minLoad := int(math.MaxInt32) diff --git a/pkg/testutils/serverutils/test_tenant_shim.go b/pkg/testutils/serverutils/test_tenant_shim.go index 9691994ec067..dfb88bfb9473 100644 --- a/pkg/testutils/serverutils/test_tenant_shim.go +++ b/pkg/testutils/serverutils/test_tenant_shim.go @@ -64,6 +64,9 @@ type TestTenantInterface interface { // DistSQLServer returns the *distsql.ServerImpl as an interface{}. DistSQLServer() interface{} + // DistSenderI returns the *kvcoord.DistSender as an interface{}. + DistSenderI() interface{} + // JobRegistry returns the *jobs.Registry as an interface{}. JobRegistry() interface{}