Skip to content

Commit

Permalink
sql,kv,followerreadsccl: enable follower reads for historical queries
Browse files Browse the repository at this point in the history
Follower reads are reads which can be served from any replica as opposed to just
the current lease holder. The foundation for this change was laid with the work
to introduce closed timestamps and to support follower reads at the replica
level. This change adds the required support to the sql and kv layers and
additionally exposes a new syntax to ease client adoption of the functionality.

The change adds the followerreadsccl package with logic to check when follower
reads are safe and to inject the functionality so that it can be packaged as an
enterprise feature.

Modifies `AS OF SYSTEM TIME` the semantics to allow for the evaluation of a new
builtin tentatively called `experimental_follower_read_timestamp()` in addition
to constant expressions. This new builtin ensures that an enterprise license
exists and then returns a time that can likely be used to read from a follower.

The change abstracts (and renames to the more appropriate replicaoracle) the
existing leaseHolderOracle in the distsqlplan package to allow a follower read
aware policy to be injected.

Lastly the change add to kv a site to inject a function for checking if follower
reads are safe and allowed given a cluster, settings, and batch request.

This change includes a high level roachtest which validates observable behavior
of performing follower reads by examining latencies for reads in a geo-
replicated setting.

Release note (enterprise change): Add support for performing sufficiently old
historical reads against closest replicas rather than leaseholders. A new
builtin function `experimental_follower_read_timestamp()` which can be used with
`AS OF SYSTEM TIME` clauses to generate a timestamp which is likely to be safe
for reads from a follower.
  • Loading branch information
ajwerner committed Feb 5, 2019
1 parent dc4c803 commit 70be833
Show file tree
Hide file tree
Showing 22 changed files with 1,380 additions and 277 deletions.
9 changes: 9 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,15 @@ significant than <code>element</code> to zero (or one, for day and month)</p>
<p>Compatible elements: year, quarter, month, week, hour, minute, second,
millisecond, microsecond.</p>
</span></td></tr>
<tr><td><code>experimental_follower_read_timestamp() &rarr; <a href="timestamp.html">timestamptz</a></code></td><td><span class="funcdesc"><p>Returns a timestamp which is very likely to be safe to perform
against a follower replica.</p>
<p>This function is intended to be used with an AS OF SYSTEM TIME clause to perform
historical reads against a time which is recent but sufficiently old for reads
to be performed against the closest replica as opposed to the currently
leaseholder for a given range.</p>
<p>Note that this function requires an enterprise license on a CCL distribution to
return without an error.</p>
</span></td></tr>
<tr><td><code>experimental_strftime(input: <a href="date.html">date</a>, extract_format: <a href="string.html">string</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>From <code>input</code>, extracts and formats the time as identified in <code>extract_format</code> using standard <code>strftime</code> notation (though not all formatting is supported).</p>
</span></td></tr>
<tr><td><code>experimental_strftime(input: <a href="timestamp.html">timestamp</a>, extract_format: <a href="string.html">string</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>From <code>input</code>, extracts and formats the time as identified in <code>extract_format</code> using standard <code>strftime</code> notation (though not all formatting is supported).</p>
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/ccl_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/buildccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/followerreadsccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/importccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/roleccl"
Expand Down
130 changes: 130 additions & 0 deletions pkg/ccl/followerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

// Package followerreadsccl implements and injects the functionality needed to
// expose follower reads to clients.
package followerreadsccl

import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/closedts"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

// followerReadMultiple is the multiple of kv.closed_timestmap.target_duration
// which the implementation of the follower read capable replica policy ought
// to use to determine if a request can be used for reading.
// FollowerReadMultiple is a hidden setting.
var followerReadMultiple = settings.RegisterValidatedFloatSetting(
"kv.follower_read.target_multiple",
"if above 1, encourages the distsender to perform a read against the "+
"closest replica if a request is older than kv.closed_timestamp.target_duration"+
" * (1 + kv.closed_timestamp.close_fraction * this) less a clock uncertainty "+
"interval. This value also is used to create follower_timestamp().",
3,
func(v float64) error {
if v < 1 {
return fmt.Errorf("%v is not >= 1", v)
}
return nil
},
)

func init() { followerReadMultiple.Hide() }

// getFollowerReadOffset returns the offset duration which should be used to as
// the offset from now to request a follower read. The same value less the clock
// uncertainty, then is used to determine at the kv layer if a query can use a
// follower read.
func getFollowerReadDuration(st *cluster.Settings) time.Duration {
targetMultiple := followerReadMultiple.Get(&st.SV)
targetDuration := closedts.TargetDuration.Get(&st.SV)
closeFraction := closedts.CloseFraction.Get(&st.SV)
return -1 * time.Duration(float64(targetDuration)*
(1+closeFraction*targetMultiple))
}

func checkEnterpriseEnabled(clusterID uuid.UUID, st *cluster.Settings) error {
org := sql.ClusterOrganization.Get(&st.SV)
return utilccl.CheckEnterpriseEnabled(st, clusterID, org, "follower reads")
}

func evalFollowerReadOffset(clusterID uuid.UUID, st *cluster.Settings) (time.Duration, error) {
if err := checkEnterpriseEnabled(clusterID, st); err != nil {
return 0, err
}
return getFollowerReadDuration(st), nil
}

// canUseFollowerRead determines if a query can be sent to a follower
func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timestamp) bool {
if !storage.FollowerReadsEnabled.Get(&st.SV) {
return false
}
threshold := (-1 * getFollowerReadDuration(st)) - base.DefaultMaxClockOffset
if timeutil.Since(ts.GoTime()) < threshold {
return false
}
return checkEnterpriseEnabled(clusterID, st) == nil
}

// canSendToFollower implements the logic for checking whether a batch request
// may be sent to a follower.
func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool {
return ba.IsReadOnly() && ba.Txn != nil &&
canUseFollowerRead(clusterID, st, ba.Txn.OrigTimestamp)
}

type oracleFactory struct {
clusterID *base.ClusterIDContainer
st *cluster.Settings

binPacking replicaoracle.OracleFactory
closest replicaoracle.OracleFactory
}

func newOracleFactory(cfg replicaoracle.Config) replicaoracle.OracleFactory {
return &oracleFactory{
clusterID: &cfg.RPCContext.ClusterID,
st: cfg.Settings,
binPacking: replicaoracle.NewOracleFactory(replicaoracle.BinPackingChoice, cfg),
closest: replicaoracle.NewOracleFactory(replicaoracle.ClosestChoice, cfg),
}
}

func (f oracleFactory) Oracle(txn *client.Txn) replicaoracle.Oracle {
if txn != nil && canUseFollowerRead(f.clusterID.Get(), f.st, txn.OrigTimestamp()) {
return f.closest.Oracle(txn)
}
return f.binPacking.Oracle(txn)
}

// followerReadAwareChoice is a leaseholder choosing policy that detects
// whether a query can be used with a follower read.
var followerReadAwareChoice = replicaoracle.RegisterPolicy(newOracleFactory)

func init() {
sql.ReplicaOraclePolicy = followerReadAwareChoice
builtins.EvalFollowerReadOffset = evalFollowerReadOffset
kv.CanSendToFollower = canSendToFollower
}
158 changes: 158 additions & 0 deletions pkg/ccl/followerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package followerreadsccl

import (
"context"
"reflect"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

const expectedFollowerReadOffset = -1 * (30 * (1 + .2*3)) * time.Second

func TestEvalFollowerReadOffset(t *testing.T) {
defer leaktest.AfterTest(t)()
disableEnterprise := utilccl.TestingEnableEnterprise()
defer disableEnterprise()
st := cluster.MakeTestingClusterSettings()
if offset, err := evalFollowerReadOffset(uuid.MakeV4(), st); err != nil {
t.Fatal(err)
} else if offset != expectedFollowerReadOffset {
t.Fatalf("expected %v, got %v", expectedFollowerReadOffset, offset)
}
disableEnterprise()
_, err := evalFollowerReadOffset(uuid.MakeV4(), st)
if !testutils.IsError(err, "requires an enterprise license") {
t.Fatalf("failed to get error when evaluating follower read offset without " +
"an enterprise license")
}
}

func TestCanSendToFollower(t *testing.T) {
defer leaktest.AfterTest(t)()
disableEnterprise := utilccl.TestingEnableEnterprise()
defer disableEnterprise()
st := cluster.MakeTestingClusterSettings()
storage.FollowerReadsEnabled.Override(&st.SV, true)
old := hlc.Timestamp{
WallTime: timeutil.Now().Add(2 * expectedFollowerReadOffset).UnixNano(),
}
oldHeader := roachpb.Header{Txn: &roachpb.Transaction{
OrigTimestamp: old,
}}
rw := roachpb.BatchRequest{Header: oldHeader}
rw.Add(&roachpb.PutRequest{})
if canSendToFollower(uuid.MakeV4(), st, rw) {
t.Fatalf("should not be able to send a rw request to a follower")
}
roNoTxn := roachpb.BatchRequest{}
roNoTxn.Add(&roachpb.GetRequest{})
if canSendToFollower(uuid.MakeV4(), st, roNoTxn) {
t.Fatalf("should not be able to send a batch with no txn to a follower")
}
roOld := roachpb.BatchRequest{Header: oldHeader}
roOld.Add(&roachpb.GetRequest{})
if !canSendToFollower(uuid.MakeV4(), st, roOld) {
t.Fatalf("should be able to send an old ro batch to a follower")
}
storage.FollowerReadsEnabled.Override(&st.SV, false)
if canSendToFollower(uuid.MakeV4(), st, roOld) {
t.Fatalf("should not be able to send an old ro batch to a follower when follower reads are disabled")
}
storage.FollowerReadsEnabled.Override(&st.SV, true)
roNew := roachpb.BatchRequest{Header: roachpb.Header{
Txn: &roachpb.Transaction{
OrigTimestamp: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
},
}}
if canSendToFollower(uuid.MakeV4(), st, roNew) {
t.Fatalf("should not be able to send a new ro batch to a follower")
}
disableEnterprise()
if canSendToFollower(uuid.MakeV4(), st, roOld) {
t.Fatalf("should not be able to send an old ro batch to a follower without enterprise enabled")
}
}

func TestFollowerReadMultipleValidation(t *testing.T) {
defer leaktest.AfterTest(t)()
defer func() {
if r := recover(); r == nil {
t.Fatalf("expected panic from setting followerReadMultiple to .1")
}
}()
st := cluster.MakeTestingClusterSettings()
followerReadMultiple.Override(&st.SV, .1)
}

// TestOracle tests the OracleFactory exposed by this package.
// This test ends up being rather indirect but works by checking if the type
// of the oracle returned from the factory differs between requests we'd
// expect to support follower reads and that which we'd expect not to.
func TestOracleFactory(t *testing.T) {
defer leaktest.AfterTest(t)()
disableEnterprise := utilccl.TestingEnableEnterprise()
defer disableEnterprise()
st := cluster.MakeTestingClusterSettings()
storage.FollowerReadsEnabled.Override(&st.SV, true)
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
aCtx := log.AmbientContext{Tracer: tracing.NewTracer()}
rpcContext := rpc.NewContext(
aCtx,
&base.Config{Insecure: true},
clock,
stopper,
&st.Version,
)
c := client.NewDB(log.AmbientContext{
Tracer: tracing.NewTracer(),
}, client.MockTxnSenderFactory{},
hlc.NewClock(hlc.UnixNano, time.Nanosecond))
txn := client.NewTxn(context.TODO(), c, 0, client.RootTxn)
of := replicaoracle.NewOracleFactory(followerReadAwareChoice, replicaoracle.Config{
Settings: st,
RPCContext: rpcContext,
})
noFollowerReadOracle := of.Oracle(txn)
old := hlc.Timestamp{
WallTime: timeutil.Now().Add(2 * expectedFollowerReadOffset).UnixNano(),
}
txn.SetFixedTimestamp(context.TODO(), old)
followerReadOracle := of.Oracle(txn)
if reflect.TypeOf(followerReadOracle) == reflect.TypeOf(noFollowerReadOracle) {
t.Fatalf("expected types of %T and %T to differ", followerReadOracle,
noFollowerReadOracle)
}
disableEnterprise()
disabledFollowerReadOracle := of.Oracle(txn)
if reflect.TypeOf(disabledFollowerReadOracle) != reflect.TypeOf(noFollowerReadOracle) {
t.Fatalf("expected types of %T and %T not to differ", disabledFollowerReadOracle,
noFollowerReadOracle)
}
}
Loading

0 comments on commit 70be833

Please sign in to comment.