Skip to content

Commit

Permalink
kv: ensure secondary tenants route follower reads to the closest replica
Browse files Browse the repository at this point in the history
The dist sender uses node locality information to rank replicas of a
range by latency. Previously, this node locality information was read
off a node descriptor available in Gossip. Unfortunately, secondary
tenants do not have access to Gossip, and as such, would end up
randomizing this list of replicas. This manifested itself through
unpredictable latencies when running follower reads.

We're no longer susceptible to this hazard with this patch. This is done
by eschewing the need of a node descriptor from gossip in the
DistSender; instead, we now instantiate the DistSender with locality
information.

However, we do still use Gossip to get the current node's
ID when ranking replicas. This is done to ascertain if there is a local
replica, and if there is, to always route to it. Unfortunately, because
secondary tenants don't have access to Gossip, they can't conform to
these semantics. They're susceptible to a hazard where a request may
be routed to another replica in the same locality tier as the client
even though the client has a local replica as well. This shouldn't be
a concern in practice given the diversity heuristic.

Resolves #81000

Release note (bug fix): fix an issue where secondary tenants could
route follower reads to a random, far away replica instead of one
closer.
  • Loading branch information
arulajmani committed Aug 9, 2022
1 parent 65d6eb9 commit a1fca9e
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 27 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_test(
embed = [":kvfollowerreadsccl"],
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/utilccl",
"//pkg/keys",
"//pkg/kv",
Expand All @@ -57,6 +58,7 @@ go_test(
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
Expand Down
206 changes: 206 additions & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ package kvfollowerreadsccl

import (
"context"
gosql "database/sql"
"fmt"
"math"
"net/url"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
// Blank import kvtenantccl so that we can create a tenant.
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -24,10 +29,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand Down Expand Up @@ -690,3 +697,202 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
require.NoError(t, err)
require.Greater(t, followerReadsCountAfter, followerReadsCountBefore)
}

// TestSecondaryTenantFollowerReadsRouting ensures that secondary tenants route
// their requests to the nearest replica. The test runs two versions -- one
// where accurate latency information between nodes is available and another
// where it needs to be estimated using node localities.
func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer utilccl.TestingEnableEnterprise()()

testutils.RunTrueAndFalse(t, "valid-latency-func", func(t *testing.T, validLatencyFunc bool) {
const numNodes = 4

serverArgs := make(map[int]base.TestServerArgs)
localities := make(map[int]roachpb.Locality)
for i := 0; i < numNodes; i++ {
regionName := fmt.Sprintf("region_%d", i)
if i == 3 {
// Make it such that n4 and n2 are in the same region. Below, we'll
// expect a follower read from n4 to be served by n2 because they're
// in the same locality (when validLatencyFunc is false).
regionName = fmt.Sprintf("region_%d", 1)
}
locality := roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: regionName}},
}
localities[i] = locality
serverArgs[i] = base.TestServerArgs{
Locality: localities[i],
DisableDefaultTestTenant: true, // we'll create one ourselves below.
}
}
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

historicalQuery := `SELECT * FROM t.test AS OF SYSTEM TIME follower_read_timestamp() WHERE k=2`
recCh := make(chan tracingpb.Recording, 1)

var tenants [numNodes]serverutils.TestTenantInterface
for i := 0; i < numNodes; i++ {
knobs := base.TestingKnobs{}
if i == 3 { // n4
knobs = base.TestingKnobs{
KVClient: &kvcoord.ClientTestingKnobs{
DontConsiderConnHealth: true,
// For the validLatencyFunc=true version of the test, the client
// pretends to have a low latency connection to n2. As a result, we
// expect n2 to be used for follower reads originating from n4.
//
// For the variant where no latency information is available, we
// expect n2 to serve follower reads as well, but because it
// is in the same locality as the client.
LatencyFunc: func(addr string) (time.Duration, bool) {
if !validLatencyFunc {
return 0, false
}
if addr == tc.Server(1).RPCAddr() {
return time.Millisecond, true
}
return 100 * time.Millisecond, true
},
},
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(trace tracingpb.Recording, stmt string) {
if stmt == historicalQuery {
recCh <- trace
}
},
},
}
}
tt, err := tc.Server(i).StartTenant(ctx, base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
Locality: localities[i],
TestingKnobs: knobs,
})
require.NoError(t, err)
tenants[i] = tt
}

// Speed up closing of timestamps in order to sleep less below before we can
// use follower_read_timestamp(). Note that we need to override the setting
// for the tenant as well, because the builtin is run in the tenant's sql pod.
systemSQL := sqlutils.MakeSQLRunner(tc.Conns[0])
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`)
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`)
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`)
systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`)
systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`)
systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`)
// We're making assertions on traces collected by the tenant using log lines
// in KV so we must ensure they're not redacted.
systemSQL.Exec(t, `SET CLUSTER SETTING server.secondary_tenants.redact_trace = 'false'`)

// Wait until all tenant servers are aware of the setting override.
testutils.SucceedsSoon(t, func() error {
settingNames := []string{
"kv.closed_timestamp.target_duration", "kv.closed_timestamp.side_transport_interval", "kv.closed_timestamp.propagation_slack",
}
for _, settingName := range settingNames {
for i := 0; i < numNodes; i++ {
pgURL, cleanup := sqlutils.PGUrl(t, tenants[i].SQLAddr(), "Tenant", url.User(username.RootUser))
defer cleanup()
db, err := gosql.Open("postgres", pgURL.String())
if err != nil {
t.Fatal(err)
}
defer db.Close()

var val string
err = db.QueryRow(
fmt.Sprintf("SHOW CLUSTER SETTING %s", settingName),
).Scan(&val)
require.NoError(t, err)
if val != "00:00:00.1" {
return errors.Errorf("tenant server %d is still waiting for %s update: currently %s",
i,
settingName,
val,
)
}
}
}
return nil
})

pgURL, cleanupPGUrl := sqlutils.PGUrl(
t, tenants[3].SQLAddr(), "Tenant", url.User(username.RootUser),
)
defer cleanupPGUrl()
tenantSQLDB, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
defer tenantSQLDB.Close()
tenantSQL := sqlutils.MakeSQLRunner(tenantSQLDB)

tenantSQL.Exec(t, `CREATE DATABASE t`)
tenantSQL.Exec(t, `CREATE TABLE t.test (k INT PRIMARY KEY)`)

startKey := keys.MakeSQLCodec(serverutils.TestTenantID()).TenantPrefix()
tc.AddVotersOrFatal(t, startKey, tc.Target(1), tc.Target(2))
desc := tc.LookupRangeOrFatal(t, startKey)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
}, desc.Replicas().Descriptors())

// Sleep so that we can perform follower reads. The read timestamp needs to be
// above the timestamp when the table was created.
log.Infof(ctx, "test sleeping for the follower read timestamps to pass the table creation timestamp...")
time.Sleep(300 * time.Millisecond)
log.Infof(ctx, "test sleeping... done")

getFollowerReadCounts := func() [numNodes]int64 {
var counts [numNodes]int64
for i := range tc.Servers {
err := tc.Servers[i].Stores().VisitStores(func(s *kvserver.Store) error {
counts[i] = s.Metrics().FollowerReadsCount.Count()
return nil
})
require.NoError(t, err)
}
return counts
}

// Check that the cache was indeed populated.
tenantSQL.Exec(t, `SELECT * FROM t.test WHERE k = 1`)
tablePrefix := keys.MustAddr(keys.MakeSQLCodec(serverutils.TestTenantID()).TenantPrefix())
cache := tenants[3].DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
entry := cache.GetCached(ctx, tablePrefix, false /* inverted */)
require.NotNil(t, entry)
require.False(t, entry.Lease().Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
}, entry.Desc().Replicas().Descriptors())

followerReadCountsBefore := getFollowerReadCounts()
tenantSQL.Exec(t, historicalQuery)
followerReadsCountsAfter := getFollowerReadCounts()

rec := <-recCh
// Look at the trace and check that we've served a follower read.
require.True(t, kv.OnlyFollowerReads(rec), "query was served through follower reads: %s", rec)

for i := 0; i < numNodes; i++ {
if i == 1 { // n2
require.Greater(t, followerReadsCountsAfter[i], followerReadCountsBefore[i])
continue
}
require.Equal(t, followerReadsCountsAfter[i], followerReadCountsBefore[i])
}
})
}
16 changes: 13 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,11 @@ type DistSender struct {
// LatencyFunc is used to estimate the latency to other nodes.
latencyFunc LatencyFunc

// locality is the description of the topography of the server on which the
// DistSender is running. It is used to estimate the latency to other nodes
// in the absence of a latency function.
locality roachpb.Locality

// If set, the DistSender will try the replicas in the order they appear in
// the descriptor, instead of trying to reorder them by latency. The knob
// only applies to requests sent with the LEASEHOLDER routing policy.
Expand Down Expand Up @@ -386,6 +391,10 @@ type DistSenderConfig struct {
FirstRangeProvider FirstRangeProvider
RangeDescriptorDB rangecache.RangeDescriptorDB

// Locality is the description of the topography of the server on which the
// DistSender is running.
Locality roachpb.Locality

// KVInterceptor is set for tenants; when set, information about all
// BatchRequests and BatchResponses are passed through this interceptor, which
// can potentially throttle requests.
Expand All @@ -405,6 +414,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
nodeDescs: cfg.NodeDescs,
metrics: makeDistSenderMetrics(),
kvInterceptor: cfg.KVInterceptor,
locality: cfg.Locality,
}
if ds.st == nil {
ds.st = cluster.MakeTestingClusterSettings()
Expand Down Expand Up @@ -545,7 +555,7 @@ func (ds *DistSender) FirstRange() (*roachpb.RangeDescriptor, error) {
// getNodeID attempts to return the local node ID. It returns 0 if the DistSender
// does not have access to the Gossip network.
func (ds *DistSender) getNodeID() roachpb.NodeID {
// TODO(nvanbenschoten): open an issue about the effect of this.
// TODO(arul): Open a new issue about the new effect of this.
g, ok := ds.nodeDescs.(*gossip.Gossip)
if !ok {
return 0
Expand Down Expand Up @@ -1959,7 +1969,7 @@ func (ds *DistSender) sendToReplicas(
// First order by latency, then move the leaseholder to the front of the
// list, if it is known.
if !ds.dontReorderReplicas {
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc)
replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality)
}

idx := -1
Expand All @@ -1978,7 +1988,7 @@ func (ds *DistSender) sendToReplicas(
case roachpb.RoutingPolicy_NEAREST:
// Order by latency.
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required")
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc)
replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality)

default:
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (ds *DistSender) singleRangeFeed(
if err != nil {
return args.Timestamp, err
}
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)
replicas.OptimizeReplicaOrder(ds.getNodeID(), latencyFn, ds.locality)
// The RangeFeed is not used for system critical traffic so use a DefaultClass
// connection regardless of the range.
opts := SendOptions{class: connectionClass(&ds.st.SV)}
Expand Down
24 changes: 13 additions & 11 deletions pkg/kv/kvclient/kvcoord/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,21 +188,23 @@ type LatencyFunc func(string) (time.Duration, bool)
// they're to be used for sending RPCs (meaning in the order in which
// they'll be probed for the lease). Lower latency and "closer"
// (matching in more attributes) replicas are ordered first. If the
// current node is a replica, then it'll be the first one.
// current node has a replica (and the current node's ID is supplied)
// then it'll be the first one.
//
// nodeDesc is the descriptor of the current node. It can be nil, in
// which case information about the current descriptor is not used in
// optimizing the order.
// nodeID is the ID of the current node the current node. It can be 0, in which
// case information about the current node is not used in optimizing the order.
// Similarly, latencyFn can be nil, in which case it will not be used.
//
// Note that this method is not concerned with any information the
// node might have about who the lease holder might be. If the
// leaseholder is known by the caller, the caller will move it to the
// front if appropriate.
func (rs ReplicaSlice) OptimizeReplicaOrder(
nodeDesc *roachpb.NodeDescriptor, latencyFn LatencyFunc,
nodeID roachpb.NodeID, latencyFn LatencyFunc, locality roachpb.Locality,
) {
// If we don't know which node we're on, send the RPCs randomly.
if nodeDesc == nil {
// If we don't know which node we're on or its locality, and we don't have
// latency information to other nodes, send the RPCs randomly.
if nodeID == 0 && latencyFn == nil && len(locality.Tiers) == 0 {
shuffle.Shuffle(rs)
return
}
Expand All @@ -214,10 +216,10 @@ func (rs ReplicaSlice) OptimizeReplicaOrder(
return false // i == j
}
// Replicas on the local node sort first.
if rs[i].NodeID == nodeDesc.NodeID {
if rs[i].NodeID == nodeID {
return true // i < j
}
if rs[j].NodeID == nodeDesc.NodeID {
if rs[j].NodeID == nodeID {
return false // j < i
}

Expand All @@ -228,8 +230,8 @@ func (rs ReplicaSlice) OptimizeReplicaOrder(
return latencyI < latencyJ
}
}
attrMatchI := localityMatch(nodeDesc.Locality.Tiers, rs[i].locality())
attrMatchJ := localityMatch(nodeDesc.Locality.Tiers, rs[j].locality())
attrMatchI := localityMatch(locality.Tiers, rs[i].locality())
attrMatchJ := localityMatch(locality.Tiers, rs[j].locality())
// Longer locality matches sort first (the assumption is that
// they'll have better latencies).
return attrMatchI > attrMatchJ
Expand Down
Loading

0 comments on commit a1fca9e

Please sign in to comment.