Skip to content

Commit

Permalink
kv: ensure secondary tenants route follower reads to the closest replica
Browse files Browse the repository at this point in the history
The dist sender uses node locality information to rank replicas of a
range by latency. Previously, this node locality information was read
off a node descriptor available in Gossip. Unfortunately, secondary
tenants do not have access to Gossip, and as such, would end up
randomizing this list of replicas. This manifested itself through
unpredictable latencies when running follower reads.

We're no longer susceptible to this hazard with this patch. This is done
by eschewing the need of a node descriptor from gossip in the
DistSender; instead, we now instantiate the DistSender with locality
information.

However, we do still use Gossip to get the current node's
ID when ranking replicas. This is done to ascertain if there is a local
replica, and if there is, to always route to it. Unfortunately, because
secondary tenants don't have access to Gossip, they can't conform to
these semantics. They're susceptible to a hazard where a request may
be routed to another replica in the same locality tier as the client
even though the client has a local replica as well. This shouldn't be
a concern in practice given the diversity heuristic. It also shouldn't
be a concern given tenant SQL pods don't run in process with KV nodes
today.

Resolves #81000

Release note (bug fix): fix an issue where secondary tenants could
route follower reads to a random, far away replica instead of one
closer.
  • Loading branch information
arulajmani committed Aug 14, 2022
1 parent a7d5abf commit 3d87dde
Show file tree
Hide file tree
Showing 13 changed files with 275 additions and 44 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_test(
embed = [":kvfollowerreadsccl"],
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/utilccl",
"//pkg/keys",
"//pkg/kv",
Expand All @@ -57,6 +58,7 @@ go_test(
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
Expand Down
208 changes: 208 additions & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ package kvfollowerreadsccl

import (
"context"
gosql "database/sql"
"fmt"
"math"
"net/url"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
// Blank import kvtenantccl so that we can create a tenant.
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -24,10 +29,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand Down Expand Up @@ -694,3 +701,204 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
require.NoError(t, err)
require.Greater(t, followerReadsCountAfter, followerReadsCountBefore)
}

// TestSecondaryTenantFollowerReadsRouting ensures that secondary tenants route
// their requests to the nearest replica. The test runs two versions -- one
// where accurate latency information between nodes is available and another
// where it needs to be estimated using node localities.
func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer utilccl.TestingEnableEnterprise()()

skip.UnderStressRace(t, "times out")

testutils.RunTrueAndFalse(t, "valid-latency-func", func(t *testing.T, validLatencyFunc bool) {
const numNodes = 4

serverArgs := make(map[int]base.TestServerArgs)
localities := make(map[int]roachpb.Locality)
for i := 0; i < numNodes; i++ {
regionName := fmt.Sprintf("region_%d", i)
if i == 3 {
// Make it such that n4 and n2 are in the same region. Below, we'll
// expect a follower read from n4 to be served by n2 because they're
// in the same locality (when validLatencyFunc is false).
regionName = fmt.Sprintf("region_%d", 1)
}
locality := roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: regionName}},
}
localities[i] = locality
serverArgs[i] = base.TestServerArgs{
Locality: localities[i],
DisableDefaultTestTenant: true, // we'll create one ourselves below.
}
}
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

historicalQuery := `SELECT * FROM t.test AS OF SYSTEM TIME follower_read_timestamp() WHERE k=2`
recCh := make(chan tracingpb.Recording, 1)

var tenants [numNodes]serverutils.TestTenantInterface
for i := 0; i < numNodes; i++ {
knobs := base.TestingKnobs{}
if i == 3 { // n4
knobs = base.TestingKnobs{
KVClient: &kvcoord.ClientTestingKnobs{
DontConsiderConnHealth: true,
// For the validLatencyFunc=true version of the test, the client
// pretends to have a low latency connection to n2. As a result, we
// expect n2 to be used for follower reads originating from n4.
//
// For the variant where no latency information is available, we
// expect n2 to serve follower reads as well, but because it
// is in the same locality as the client.
LatencyFunc: func(addr string) (time.Duration, bool) {
if !validLatencyFunc {
return 0, false
}
if addr == tc.Server(1).RPCAddr() {
return time.Millisecond, true
}
return 100 * time.Millisecond, true
},
},
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(trace tracingpb.Recording, stmt string) {
if stmt == historicalQuery {
recCh <- trace
}
},
},
}
}
tt, err := tc.Server(i).StartTenant(ctx, base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
Locality: localities[i],
TestingKnobs: knobs,
})
require.NoError(t, err)
tenants[i] = tt
}

// Speed up closing of timestamps in order to sleep less below before we can
// use follower_read_timestamp(). Note that we need to override the setting
// for the tenant as well, because the builtin is run in the tenant's sql pod.
systemSQL := sqlutils.MakeSQLRunner(tc.Conns[0])
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`)
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`)
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`)
systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`)
systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`)
systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`)
// We're making assertions on traces collected by the tenant using log lines
// in KV so we must ensure they're not redacted.
systemSQL.Exec(t, `SET CLUSTER SETTING server.secondary_tenants.redact_trace.enabled = 'false'`)

// Wait until all tenant servers are aware of the setting override.
testutils.SucceedsSoon(t, func() error {
settingNames := []string{
"kv.closed_timestamp.target_duration", "kv.closed_timestamp.side_transport_interval", "kv.closed_timestamp.propagation_slack",
}
for _, settingName := range settingNames {
for i := 0; i < numNodes; i++ {
pgURL, cleanup := sqlutils.PGUrl(t, tenants[i].SQLAddr(), "Tenant", url.User(username.RootUser))
defer cleanup()
db, err := gosql.Open("postgres", pgURL.String())
if err != nil {
t.Fatal(err)
}
defer db.Close()

var val string
err = db.QueryRow(
fmt.Sprintf("SHOW CLUSTER SETTING %s", settingName),
).Scan(&val)
require.NoError(t, err)
if val != "00:00:00.1" {
return errors.Errorf("tenant server %d is still waiting for %s update: currently %s",
i,
settingName,
val,
)
}
}
}
return nil
})

pgURL, cleanupPGUrl := sqlutils.PGUrl(
t, tenants[3].SQLAddr(), "Tenant", url.User(username.RootUser),
)
defer cleanupPGUrl()
tenantSQLDB, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
defer tenantSQLDB.Close()
tenantSQL := sqlutils.MakeSQLRunner(tenantSQLDB)

tenantSQL.Exec(t, `CREATE DATABASE t`)
tenantSQL.Exec(t, `CREATE TABLE t.test (k INT PRIMARY KEY)`)

startKey := keys.MakeSQLCodec(serverutils.TestTenantID()).TenantPrefix()
tc.AddVotersOrFatal(t, startKey, tc.Target(1), tc.Target(2))
desc := tc.LookupRangeOrFatal(t, startKey)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
}, desc.Replicas().Descriptors())

// Sleep so that we can perform follower reads. The read timestamp needs to be
// above the timestamp when the table was created.
log.Infof(ctx, "test sleeping for the follower read timestamps to pass the table creation timestamp...")
time.Sleep(500 * time.Millisecond)
log.Infof(ctx, "test sleeping... done")

getFollowerReadCounts := func() [numNodes]int64 {
var counts [numNodes]int64
for i := range tc.Servers {
err := tc.Servers[i].Stores().VisitStores(func(s *kvserver.Store) error {
counts[i] = s.Metrics().FollowerReadsCount.Count()
return nil
})
require.NoError(t, err)
}
return counts
}

// Check that the cache was indeed populated.
tenantSQL.Exec(t, `SELECT * FROM t.test WHERE k = 1`)
tablePrefix := keys.MustAddr(keys.MakeSQLCodec(serverutils.TestTenantID()).TenantPrefix())
cache := tenants[3].DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
entry := cache.GetCached(ctx, tablePrefix, false /* inverted */)
require.NotNil(t, entry)
require.False(t, entry.Lease().Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
}, entry.Desc().Replicas().Descriptors())

followerReadCountsBefore := getFollowerReadCounts()
tenantSQL.Exec(t, historicalQuery)
followerReadsCountsAfter := getFollowerReadCounts()

rec := <-recCh
// Look at the trace and check that we've served a follower read.
require.True(t, kv.OnlyFollowerReads(rec), "query was served through follower reads: %s", rec)

for i := 0; i < numNodes; i++ {
if i == 1 { // n2
require.Greater(t, followerReadsCountsAfter[i], followerReadCountsBefore[i])
continue
}
require.Equal(t, followerReadsCountsAfter[i], followerReadCountsBefore[i])
}
})
}
21 changes: 18 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,11 @@ type DistSender struct {
// LatencyFunc is used to estimate the latency to other nodes.
latencyFunc LatencyFunc

// locality is the description of the topography of the server on which the
// DistSender is running. It is used to estimate the latency to other nodes
// in the absence of a latency function.
locality roachpb.Locality

// If set, the DistSender will try the replicas in the order they appear in
// the descriptor, instead of trying to reorder them by latency. The knob
// only applies to requests sent with the LEASEHOLDER routing policy.
Expand Down Expand Up @@ -386,6 +391,10 @@ type DistSenderConfig struct {
FirstRangeProvider FirstRangeProvider
RangeDescriptorDB rangecache.RangeDescriptorDB

// Locality is the description of the topography of the server on which the
// DistSender is running.
Locality roachpb.Locality

// KVInterceptor is set for tenants; when set, information about all
// BatchRequests and BatchResponses are passed through this interceptor, which
// can potentially throttle requests.
Expand All @@ -405,6 +414,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
nodeDescs: cfg.NodeDescs,
metrics: makeDistSenderMetrics(),
kvInterceptor: cfg.KVInterceptor,
locality: cfg.Locality,
}
if ds.st == nil {
ds.st = cluster.MakeTestingClusterSettings()
Expand Down Expand Up @@ -545,7 +555,12 @@ func (ds *DistSender) FirstRange() (*roachpb.RangeDescriptor, error) {
// getNodeID attempts to return the local node ID. It returns 0 if the DistSender
// does not have access to the Gossip network.
func (ds *DistSender) getNodeID() roachpb.NodeID {
// TODO(nvanbenschoten): open an issue about the effect of this.
// Today, secondary tenants don't run in process with KV instances, so they
// don't have access to the Gossip network. The DistSender uses the node ID to
// preferentially route requests to a local replica (if one exists). Not
// knowing the node ID, and thus not being able to take advantage of this
// optimization is okay, given tenants not running in-process with KV
// instances have no such optimization to take advantage of to begin with.
g, ok := ds.nodeDescs.(*gossip.Gossip)
if !ok {
return 0
Expand Down Expand Up @@ -1968,7 +1983,7 @@ func (ds *DistSender) sendToReplicas(
// First order by latency, then move the leaseholder to the front of the
// list, if it is known.
if !ds.dontReorderReplicas {
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc)
replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality)
}

idx := -1
Expand All @@ -1987,7 +2002,7 @@ func (ds *DistSender) sendToReplicas(
case roachpb.RoutingPolicy_NEAREST:
// Order by latency.
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required")
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc)
replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality)

default:
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (ds *DistSender) singleRangeFeed(
if err != nil {
return args.Timestamp, err
}
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)
replicas.OptimizeReplicaOrder(ds.getNodeID(), latencyFn, ds.locality)
// The RangeFeed is not used for system critical traffic so use a DefaultClass
// connection regardless of the range.
opts := SendOptions{class: connectionClass(&ds.st.SV)}
Expand Down
21 changes: 4 additions & 17 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,25 +389,15 @@ func TestSendRPCOrder(t *testing.T) {
Settings: cluster.MakeTestingClusterSettings(),
}

ds := NewDistSender(cfg)

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
verifyCall = makeVerifier(tc.expReplica)

{
// The local node needs to get its attributes during sendRPC.
nd := &roachpb.NodeDescriptor{
NodeID: 6,
Address: util.MakeUnresolvedAddr("tcp", "invalid.invalid:6"),
Locality: roachpb.Locality{
Tiers: tc.tiers,
},
}
g.NodeID.Reset(nd.NodeID)
err := g.SetNodeDescriptor(nd)
require.NoError(t, err)
g.NodeID.Reset(6)
cfg.Locality = roachpb.Locality{
Tiers: tc.tiers,
}
ds := NewDistSender(cfg)

ds.rangeCache.Clear()
var lease roachpb.Lease
Expand All @@ -419,9 +409,6 @@ func TestSendRPCOrder(t *testing.T) {
Lease: lease,
})

// Kill the cached NodeDescriptor, enforcing a lookup from Gossip.
ds.nodeDescriptor = nil

// Issue the request.
header := roachpb.Header{
RangeID: rangeID, // Not used in this test, but why not.
Expand Down
Loading

0 comments on commit 3d87dde

Please sign in to comment.