Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: ensure secondary tenants route follower reads to the closest replica #85853

Merged
merged 5 commits into from
Aug 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<tr><td><code>server.oidc_authentication.redirect_url</code></td><td>string</td><td><code>https://localhost:8080/oidc/v1/callback</code></td><td>sets OIDC redirect URL via a URL string or a JSON string containing a required `redirect_urls` key with an object that maps from region keys to URL strings (URLs should point to your load balancer and must route to the path /oidc/v1/callback) </td></tr>
<tr><td><code>server.oidc_authentication.scopes</code></td><td>string</td><td><code>openid</code></td><td>sets OIDC scopes to include with authentication request (space delimited list of strings, required to start with `openid`)</td></tr>
<tr><td><code>server.rangelog.ttl</code></td><td>duration</td><td><code>720h0m0s</code></td><td>if nonzero, range log entries older than this duration are deleted every 10m0s. Should not be lowered below 24 hours.</td></tr>
<tr><td><code>server.secondary_tenants.redact_trace.enabled</code></td><td>boolean</td><td><code>true</code></td><td>controls if server side traces are redacted for tenant operations</td></tr>
<tr><td><code>server.shutdown.connection_wait</code></td><td>duration</td><td><code>0s</code></td><td>the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td></tr>
<tr><td><code>server.shutdown.drain_wait</code></td><td>duration</td><td><code>0s</code></td><td>the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.)</td></tr>
<tr><td><code>server.shutdown.lease_transfer_wait</code></td><td>duration</td><td><code>5s</code></td><td>the timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_test(
embed = [":kvfollowerreadsccl"],
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/utilccl",
"//pkg/keys",
"//pkg/kv",
Expand All @@ -57,6 +58,7 @@ go_test(
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/boundedstaleness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func TestBoundedStalenessDataDriven(t *testing.T) {
for i := 0; i < numNodes; i++ {
i := i
clusterArgs.ServerArgsPerNode[i] = base.TestServerArgs{
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(trace tracingpb.Recording, stmt string) {
Expand Down
216 changes: 214 additions & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ package kvfollowerreadsccl

import (
"context"
gosql "database/sql"
"fmt"
"math"
"net/url"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
// Blank import kvtenantccl so that we can create a tenant.
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -24,10 +29,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand Down Expand Up @@ -568,13 +575,17 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
tc := testcluster.StartTestCluster(t, 4,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{UseDatabase: "t"},
ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true,
UseDatabase: "t",
},
// n4 pretends to have low latency to n2 and n3, so that it tries to use
// them for follower reads.
// Also, we're going to collect a trace of the test's final query.
ServerArgsPerNode: map[int]base.TestServerArgs{
3: {
UseDatabase: "t",
DisableDefaultTestTenant: true,
UseDatabase: "t",
Knobs: base.TestingKnobs{
KVClient: &kvcoord.ClientTestingKnobs{
// Inhibit the checking of connection health done by the
Expand Down Expand Up @@ -690,3 +701,204 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
require.NoError(t, err)
require.Greater(t, followerReadsCountAfter, followerReadsCountBefore)
}

// TestSecondaryTenantFollowerReadsRouting ensures that secondary tenants route
// their requests to the nearest replica. The test runs two versions -- one
// where accurate latency information between nodes is available and another
// where it needs to be estimated using node localities.
func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer utilccl.TestingEnableEnterprise()()

skip.UnderStressRace(t, "times out")

testutils.RunTrueAndFalse(t, "valid-latency-func", func(t *testing.T, validLatencyFunc bool) {
const numNodes = 4

serverArgs := make(map[int]base.TestServerArgs)
localities := make(map[int]roachpb.Locality)
for i := 0; i < numNodes; i++ {
regionName := fmt.Sprintf("region_%d", i)
if i == 3 {
// Make it such that n4 and n2 are in the same region. Below, we'll
// expect a follower read from n4 to be served by n2 because they're
// in the same locality (when validLatencyFunc is false).
regionName = fmt.Sprintf("region_%d", 1)
}
locality := roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: regionName}},
}
localities[i] = locality
serverArgs[i] = base.TestServerArgs{
Locality: localities[i],
DisableDefaultTestTenant: true, // we'll create one ourselves below.
}
}
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

historicalQuery := `SELECT * FROM t.test AS OF SYSTEM TIME follower_read_timestamp() WHERE k=2`
recCh := make(chan tracingpb.Recording, 1)

var tenants [numNodes]serverutils.TestTenantInterface
for i := 0; i < numNodes; i++ {
knobs := base.TestingKnobs{}
if i == 3 { // n4
knobs = base.TestingKnobs{
KVClient: &kvcoord.ClientTestingKnobs{
DontConsiderConnHealth: true,
// For the validLatencyFunc=true version of the test, the client
// pretends to have a low latency connection to n2. As a result, we
// expect n2 to be used for follower reads originating from n4.
//
// For the variant where no latency information is available, we
// expect n2 to serve follower reads as well, but because it
// is in the same locality as the client.
LatencyFunc: func(addr string) (time.Duration, bool) {
if !validLatencyFunc {
return 0, false
}
if addr == tc.Server(1).RPCAddr() {
return time.Millisecond, true
}
return 100 * time.Millisecond, true
},
},
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(trace tracingpb.Recording, stmt string) {
if stmt == historicalQuery {
recCh <- trace
}
},
},
}
}
tt, err := tc.Server(i).StartTenant(ctx, base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
Locality: localities[i],
TestingKnobs: knobs,
})
require.NoError(t, err)
tenants[i] = tt
}

// Speed up closing of timestamps in order to sleep less below before we can
// use follower_read_timestamp(). Note that we need to override the setting
// for the tenant as well, because the builtin is run in the tenant's sql pod.
systemSQL := sqlutils.MakeSQLRunner(tc.Conns[0])
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`)
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`)
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`)
systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`)
systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`)
systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`)
// We're making assertions on traces collected by the tenant using log lines
// in KV so we must ensure they're not redacted.
systemSQL.Exec(t, `SET CLUSTER SETTING server.secondary_tenants.redact_trace.enabled = 'false'`)

// Wait until all tenant servers are aware of the setting override.
testutils.SucceedsSoon(t, func() error {
settingNames := []string{
"kv.closed_timestamp.target_duration", "kv.closed_timestamp.side_transport_interval", "kv.closed_timestamp.propagation_slack",
}
for _, settingName := range settingNames {
for i := 0; i < numNodes; i++ {
pgURL, cleanup := sqlutils.PGUrl(t, tenants[i].SQLAddr(), "Tenant", url.User(username.RootUser))
defer cleanup()
db, err := gosql.Open("postgres", pgURL.String())
if err != nil {
t.Fatal(err)
}
defer db.Close()

var val string
err = db.QueryRow(
fmt.Sprintf("SHOW CLUSTER SETTING %s", settingName),
).Scan(&val)
require.NoError(t, err)
if val != "00:00:00.1" {
return errors.Errorf("tenant server %d is still waiting for %s update: currently %s",
i,
settingName,
val,
)
}
}
}
return nil
})

pgURL, cleanupPGUrl := sqlutils.PGUrl(
t, tenants[3].SQLAddr(), "Tenant", url.User(username.RootUser),
)
defer cleanupPGUrl()
tenantSQLDB, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
defer tenantSQLDB.Close()
tenantSQL := sqlutils.MakeSQLRunner(tenantSQLDB)

tenantSQL.Exec(t, `CREATE DATABASE t`)
tenantSQL.Exec(t, `CREATE TABLE t.test (k INT PRIMARY KEY)`)

startKey := keys.MakeSQLCodec(serverutils.TestTenantID()).TenantPrefix()
tc.AddVotersOrFatal(t, startKey, tc.Target(1), tc.Target(2))
desc := tc.LookupRangeOrFatal(t, startKey)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
}, desc.Replicas().Descriptors())

// Sleep so that we can perform follower reads. The read timestamp needs to be
// above the timestamp when the table was created.
log.Infof(ctx, "test sleeping for the follower read timestamps to pass the table creation timestamp...")
time.Sleep(500 * time.Millisecond)
log.Infof(ctx, "test sleeping... done")

getFollowerReadCounts := func() [numNodes]int64 {
var counts [numNodes]int64
for i := range tc.Servers {
err := tc.Servers[i].Stores().VisitStores(func(s *kvserver.Store) error {
counts[i] = s.Metrics().FollowerReadsCount.Count()
return nil
})
require.NoError(t, err)
}
return counts
}

// Check that the cache was indeed populated.
tenantSQL.Exec(t, `SELECT * FROM t.test WHERE k = 1`)
tablePrefix := keys.MustAddr(keys.MakeSQLCodec(serverutils.TestTenantID()).TenantPrefix())
cache := tenants[3].DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
entry := cache.GetCached(ctx, tablePrefix, false /* inverted */)
require.NotNil(t, entry)
require.False(t, entry.Lease().Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
}, entry.Desc().Replicas().Descriptors())

followerReadCountsBefore := getFollowerReadCounts()
tenantSQL.Exec(t, historicalQuery)
followerReadsCountsAfter := getFollowerReadCounts()

rec := <-recCh
// Look at the trace and check that we've served a follower read.
require.True(t, kv.OnlyFollowerReads(rec), "query was served through follower reads: %s", rec)

for i := 0; i < numNodes; i++ {
if i == 1 { // n2
require.Greater(t, followerReadsCountsAfter[i], followerReadCountsBefore[i])
continue
}
require.Equal(t, followerReadsCountsAfter[i], followerReadCountsBefore[i])
}
})
}
55 changes: 18 additions & 37 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,11 @@ type DistSender struct {
// LatencyFunc is used to estimate the latency to other nodes.
latencyFunc LatencyFunc

// locality is the description of the topography of the server on which the
// DistSender is running. It is used to estimate the latency to other nodes
// in the absence of a latency function.
locality roachpb.Locality

// If set, the DistSender will try the replicas in the order they appear in
// the descriptor, instead of trying to reorder them by latency. The knob
// only applies to requests sent with the LEASEHOLDER routing policy.
Expand Down Expand Up @@ -386,6 +391,10 @@ type DistSenderConfig struct {
FirstRangeProvider FirstRangeProvider
RangeDescriptorDB rangecache.RangeDescriptorDB

// Locality is the description of the topography of the server on which the
// DistSender is running.
Locality roachpb.Locality

// KVInterceptor is set for tenants; when set, information about all
// BatchRequests and BatchResponses are passed through this interceptor, which
// can potentially throttle requests.
Expand All @@ -405,6 +414,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
nodeDescs: cfg.NodeDescs,
metrics: makeDistSenderMetrics(),
kvInterceptor: cfg.KVInterceptor,
locality: cfg.Locality,
}
if ds.st == nil {
ds.st = cluster.MakeTestingClusterSettings()
Expand Down Expand Up @@ -545,48 +555,19 @@ func (ds *DistSender) FirstRange() (*roachpb.RangeDescriptor, error) {
// getNodeID attempts to return the local node ID. It returns 0 if the DistSender
// does not have access to the Gossip network.
func (ds *DistSender) getNodeID() roachpb.NodeID {
// TODO(nvanbenschoten): open an issue about the effect of this.
// Today, secondary tenants don't run in process with KV instances, so they
// don't have access to the Gossip network. The DistSender uses the node ID to
// preferentially route requests to a local replica (if one exists). Not
// knowing the node ID, and thus not being able to take advantage of this
// optimization is okay, given tenants not running in-process with KV
// instances have no such optimization to take advantage of to begin with.
g, ok := ds.nodeDescs.(*gossip.Gossip)
if !ok {
return 0
}
return g.NodeID.Get()
}

// getNodeDescriptor returns ds.nodeDescriptor, but makes an attempt to load
// it from the Gossip network if a nil value is found.
// We must jump through hoops here to get the node descriptor because it's not available
// until after the node has joined the gossip network and been allowed to initialize
// its stores.
func (ds *DistSender) getNodeDescriptor() *roachpb.NodeDescriptor {
if desc := atomic.LoadPointer(&ds.nodeDescriptor); desc != nil {
return (*roachpb.NodeDescriptor)(desc)
}
// TODO(nvanbenschoten): open an issue about the effect of this.
g, ok := ds.nodeDescs.(*gossip.Gossip)
if !ok {
return nil
}

ownNodeID := g.NodeID.Get()
if ownNodeID > 0 {
// TODO(tschottdorf): Consider instead adding the NodeID of the
// coordinator to the header, so we can get this from incoming
// requests. Just in case we want to mostly eliminate gossip here.
nodeDesc := &roachpb.NodeDescriptor{}
if err := g.GetInfoProto(gossip.MakeNodeIDKey(ownNodeID), nodeDesc); err == nil {
atomic.StorePointer(&ds.nodeDescriptor, unsafe.Pointer(nodeDesc))
return nodeDesc
}
}
if log.V(1) {
ctx := ds.AnnotateCtx(context.TODO())
log.Infof(ctx, "unable to determine this node's attributes for replica "+
"selection; node is most likely bootstrapping")
}
return nil
}

// CountRanges returns the number of ranges that encompass the given key span.
func (ds *DistSender) CountRanges(ctx context.Context, rs roachpb.RSpan) (int64, error) {
var count int64
Expand Down Expand Up @@ -1968,7 +1949,7 @@ func (ds *DistSender) sendToReplicas(
// First order by latency, then move the leaseholder to the front of the
// list, if it is known.
if !ds.dontReorderReplicas {
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc)
replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality)
}

idx := -1
Expand All @@ -1987,7 +1968,7 @@ func (ds *DistSender) sendToReplicas(
case roachpb.RoutingPolicy_NEAREST:
// Order by latency.
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required")
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc)
replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality)

default:
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (ds *DistSender) singleRangeFeed(
if err != nil {
return args.Timestamp, err
}
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)
replicas.OptimizeReplicaOrder(ds.getNodeID(), latencyFn, ds.locality)
// The RangeFeed is not used for system critical traffic so use a DefaultClass
// connection regardless of the range.
opts := SendOptions{class: connectionClass(&ds.st.SV)}
Expand Down
Loading