Skip to content

Commit

Permalink
kvserver: replace circuit breaker cancel goroutine with per-Replica r…
Browse files Browse the repository at this point in the history
…egistry

PR cockroachdb#71806 originally landed with a very simple solution for the problem
of joining the breaker's cancellation chain with that of the incoming
context: spawning a goroutine. This was never meant to be the solution
in prod (spawning goroutines is not cheap) and this PR switches to a
real solution (which may still have soom room for improvement).

Replica circuit breakers are still off (cockroachdb#74705) and so this change
should not change any behavior, unless breakers are turned on. The
provided benchmarks show both the "end-to-end" `Replica.Send` as well as
the micro-bench `breaker.{R,Un}egister` perf and contrast it with that
of having the breakers disabled.

This sets the stage for evaluating and, if necessary, minimizing the
overhead, which (along with some additional end-to-end testing) then
allows us to turn breakers on by default (cockroachdb#74705).

`BenchmarkReplicaCircuitBreakerSendOverhead` measures `Replica.Send` of
an no-result single-point read. We see that we lose a few percent,
though this benchmark is pretty micro already.

`BenchmarkReplicaCircuitBreaker_Register` is even more micro, measuring
the overhead of making a cancel, and calling `Register()` followed by
`Unregister()`. The "old" measurement is essentially just creating a
cancelable context (so the +XXXX% numbers don't mean a lot; we're doing
work for which there was previously no analog). Here we can clearly see
that sharding the mutex map can help, though note that this would
already only be visible at the `Replica.Send` level at much higher
concurrencies than in this test (16).

I also ran the kv95 roachtest with what corresponds to the `mutexmap-1`
strategy above (i.e. a single mutex); [raw data here]. In this
configuration (relative to disabling breakers) `kv95/enc=false/nodes=1`
and `kv95/enc=false/nodes=1/cpu=32` sustained a -1.62% hit (in qps,
median over five runs). Once we've chosen a suitable shard count, I
expect that to drop a bit as well, and think we can turn on breakers in
production, at least from a performance point of view.

[raw data here]: https://gist.github.com/tbg/44b53a10d18490a6dabd57e1d1d3225e

old = disabled, new = enabled:

```
ReplicaCircuitBreakerSendOverhead/X/mutexmap-1-16     1.63µs ± 4%    1.77µs ± 5%     +8.38%  (p=0.000 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-2-16     1.66µs ± 4%    1.75µs ± 5%     +5.74%  (p=0.000 n=10+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-4-16     1.63µs ± 6%    1.75µs ± 4%     +7.48%  (p=0.000 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-8-16     1.58µs ± 7%    1.73µs ± 4%     +9.47%  (p=0.000 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-12-16    1.57µs ± 4%    1.62µs ± 6%     +3.19%  (p=0.046 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-16-16    1.56µs ± 5%    1.63µs ± 7%     +4.20%  (p=0.007 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-20-16    1.53µs ± 5%    1.62µs ± 8%     +6.01%  (p=0.001 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-24-16    1.55µs ± 7%    1.61µs ± 5%     +3.50%  (p=0.049 n=11+11)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-32-16    1.57µs ± 7%    1.62µs ± 6%     +3.53%  (p=0.042 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-64-16    1.53µs ± 7%    1.64µs ± 5%     +6.84%  (p=0.000 n=11+12)

name                                                old alloc/op   new alloc/op   delta
ReplicaCircuitBreakerSendOverhead/X/mutexmap-1-16     1.42kB ± 0%    1.44kB ± 0%     +1.25%  (p=0.000 n=7+11)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-2-16     1.42kB ± 0%    1.44kB ± 0%     +1.19%  (p=0.000 n=9+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-4-16     1.42kB ± 0%    1.43kB ± 0%     +1.14%  (p=0.000 n=11+11)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-8-16     1.42kB ± 0%    1.43kB ± 0%     +1.24%  (p=0.000 n=9+11)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-12-16    1.42kB ± 0%    1.43kB ± 0%     +1.08%  (p=0.000 n=11+11)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-16-16    1.42kB ± 0%    1.43kB ± 0%     +1.14%  (p=0.000 n=10+10)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-20-16    1.42kB ± 0%    1.43kB ± 0%     +1.12%  (p=0.000 n=10+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-24-16    1.42kB ± 0%    1.43kB ± 0%     +1.09%  (p=0.000 n=11+11)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-32-16    1.41kB ± 0%    1.43kB ± 0%     +1.13%  (p=0.000 n=9+11)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-64-16    1.41kB ± 0%    1.43kB ± 0%     +1.23%  (p=0.000 n=9+12)

name                                                old allocs/op  new allocs/op  delta
ReplicaCircuitBreakerSendOverhead/X/mutexmap-1-16       11.0 ± 0%      12.0 ± 0%     +9.09%  (p=0.000 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-2-16       11.0 ± 0%      12.0 ± 0%     +9.09%  (p=0.000 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-4-16       11.0 ± 0%      12.0 ± 0%     +9.09%  (p=0.000 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-8-16       11.0 ± 0%      12.0 ± 0%     +9.09%  (p=0.000 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-12-16      11.0 ± 0%      12.0 ± 0%     +9.09%  (p=0.000 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-16-16      11.0 ± 0%      12.0 ± 0%     +9.09%  (p=0.000 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-20-16      11.0 ± 0%      12.0 ± 0%     +9.09%  (p=0.000 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-24-16      11.0 ± 0%      12.0 ± 0%     +9.09%  (p=0.000 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-32-16      11.0 ± 0%      12.0 ± 0%     +9.09%  (p=0.000 n=11+12)
ReplicaCircuitBreakerSendOverhead/X/mutexmap-64-16      11.0 ± 0%      12.0 ± 0%     +9.09%  (p=0.000 n=11+12)
```

```
ReplicaCircuitBreaker_Register/X/mutexmap-1-16        30.8ns ±21%   787.6ns ± 2%  +2457.74%  (p=0.000 n=11+11)
ReplicaCircuitBreaker_Register/X/mutexmap-2-16        31.6ns ± 3%   782.0ns ± 2%  +2376.38%  (p=0.000 n=9+12)
ReplicaCircuitBreaker_Register/X/mutexmap-4-16        31.0ns ± 0%   778.8ns ± 2%  +2409.61%  (p=0.000 n=9+12)
ReplicaCircuitBreaker_Register/X/mutexmap-8-16        31.0ns ± 0%   775.8ns ± 1%  +2403.11%  (p=0.000 n=10+10)
ReplicaCircuitBreaker_Register/X/mutexmap-12-16       31.0ns ± 1%   288.8ns ± 2%   +833.06%  (p=0.000 n=10+12)
ReplicaCircuitBreaker_Register/X/mutexmap-16-16       31.1ns ± 1%   324.6ns ± 4%   +945.05%  (p=0.000 n=10+12)
ReplicaCircuitBreaker_Register/X/mutexmap-20-16       31.1ns ± 0%   193.3ns ± 2%   +522.26%  (p=0.000 n=10+12)
ReplicaCircuitBreaker_Register/X/mutexmap-24-16       31.0ns ± 0%   286.1ns ± 1%   +822.80%  (p=0.000 n=9+12)
ReplicaCircuitBreaker_Register/X/mutexmap-32-16       31.0ns ± 0%   194.4ns ± 2%   +527.12%  (p=0.000 n=10+12)
ReplicaCircuitBreaker_Register/X/mutexmap-64-16       31.0ns ± 1%   121.3ns ± 2%   +291.01%  (p=0.000 n=10+12)

name                                                old alloc/op   new alloc/op   delta
ReplicaCircuitBreaker_Register/X/mutexmap-1-16         80.0B ± 0%     96.0B ± 0%    +20.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-2-16         80.0B ± 0%     96.0B ± 0%    +20.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-4-16         80.0B ± 0%     96.0B ± 0%    +20.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-8-16         80.0B ± 0%     96.0B ± 0%    +20.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-12-16        80.0B ± 0%     96.0B ± 0%    +20.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-16-16        80.0B ± 0%     96.0B ± 0%    +20.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-20-16        80.0B ± 0%     96.0B ± 0%    +20.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-24-16        80.0B ± 0%     96.0B ± 0%    +20.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-32-16        80.0B ± 0%     96.0B ± 0%    +20.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-64-16        80.0B ± 0%     96.0B ± 0%    +20.00%  (p=0.000 n=11+12)

name                                                old allocs/op  new allocs/op  delta
ReplicaCircuitBreaker_Register/X/mutexmap-1-16          2.00 ± 0%      3.00 ± 0%    +50.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-2-16          2.00 ± 0%      3.00 ± 0%    +50.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-4-16          2.00 ± 0%      3.00 ± 0%    +50.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-8-16          2.00 ± 0%      3.00 ± 0%    +50.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-12-16         2.00 ± 0%      3.00 ± 0%    +50.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-16-16         2.00 ± 0%      3.00 ± 0%    +50.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-20-16         2.00 ± 0%      3.00 ± 0%    +50.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-24-16         2.00 ± 0%      3.00 ± 0%    +50.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-32-16         2.00 ± 0%      3.00 ± 0%    +50.00%  (p=0.000 n=11+12)
ReplicaCircuitBreaker_Register/X/mutexmap-64-16         2.00 ± 0%      3.00 ± 0%    +50.00%  (p=0.000 n=11+12)
```

Created via:

```
$ ./dev bench ./pkg/kv/kvserver/ -f BenchmarkReplicaCircuitBreaker --count 10 | tee out.txt
$ for f in enabled=true enabled=false; do grep -F "${f}" out.txt | sed "s/${f}/X/" > "${f}"; done && benchstat 'enabled=false' 'enabled=true'
```

For the record, I also tried a `sync.Map`, but it was vastly inferior
for this use case as it is allocation-heavy and its writes are fairly
slow.

Touches cockroachdb#33007.
Fixes cockroachdb#74707.

Release note: None
  • Loading branch information
tbg committed Jan 31, 2022
1 parent 71becf3 commit 412a65a
Show file tree
Hide file tree
Showing 10 changed files with 549 additions and 73 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"replica_backpressure.go",
"replica_batch_updates.go",
"replica_circuit_breaker.go",
"replica_circuit_breaker_cancelstorage.go",
"replica_closedts.go",
"replica_command.go",
"replica_consistency.go",
Expand Down Expand Up @@ -223,6 +224,7 @@ go_test(
"client_rangefeed_test.go",
"client_relocate_range_test.go",
"client_replica_backpressure_test.go",
"client_replica_circuit_breaker_bench_test.go",
"client_replica_circuit_breaker_test.go",
"client_replica_gc_test.go",
"client_replica_test.go",
Expand Down
124 changes: 124 additions & 0 deletions pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test

import (
"context"
"fmt"
"math/rand"
"strconv"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/stretchr/testify/require"
)

type replicaCircuitBreakerBench struct {
*testcluster.TestCluster
pool *sync.Pool // *BatchRequest
}

func (tc *replicaCircuitBreakerBench) repl(b *testing.B) *kvserver.Replica {
return tc.GetFirstStoreFromServer(b, 0).LookupReplica(keys.MustAddr(tc.ScratchRange(b)))
}

func setupCircuitBreakerReplicaBench(
b *testing.B, breakerEnabled bool, cs string,
) (*replicaCircuitBreakerBench, *stop.Stopper) {
b.Helper()

var numShards int
{
_, err := fmt.Sscanf(cs, "mutexmap-%d", &numShards)
require.NoError(b, err)
}
sFn := func() kvserver.CancelStorage { return &kvserver.MapCancelStorage{NumShards: numShards} }

var knobs kvserver.StoreTestingKnobs
knobs.CancelStorageFactory = sFn

var args base.TestClusterArgs
args.ServerArgs.Knobs.Store = &knobs
tc := testcluster.StartTestCluster(b, 1, args)

stmt := `SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '1000s'`
if !breakerEnabled {
stmt = `SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '0s'`
}
_, err := tc.ServerConn(0).Exec(stmt)
require.NoError(b, err)
wtc := &replicaCircuitBreakerBench{
TestCluster: tc,
}
wtc.pool = &sync.Pool{
New: func() interface{} {
repl := wtc.repl(b)
var ba roachpb.BatchRequest
ba.RangeID = repl.RangeID
ba.Timestamp = repl.Clock().NowAsClockTimestamp().ToTimestamp()
var k roachpb.Key
k = append(k, repl.Desc().StartKey.AsRawKey()...)
k = encoding.EncodeUint64Ascending(k, uint64(rand.Intn(1000)))
ba.Add(roachpb.NewGet(k, false))
return &ba
},
}
return wtc, tc.Stopper()
}

func BenchmarkReplicaCircuitBreakerSendOverhead(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)
ctx := context.Background()

for _, enabled := range []bool{false, true} {
b.Run("enabled="+strconv.FormatBool(enabled), func(b *testing.B) {
dss := []string{
"mutexmap-1", "mutexmap-2", "mutexmap-4", "mutexmap-8", "mutexmap-12", "mutexmap-16",
"mutexmap-20", "mutexmap-24", "mutexmap-32", "mutexmap-64",
}
if !enabled {
dss = dss[:1]
}

for _, ds := range dss {
b.Run(ds, func(b *testing.B) {
b.ReportAllocs()
tc, stopper := setupCircuitBreakerReplicaBench(b, enabled, ds)
defer stopper.Stop(ctx)

repl := tc.repl(b)

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ba := tc.pool.Get().(*roachpb.BatchRequest)
_, err := repl.Send(ctx, *ba)
tc.pool.Put(ba)
if err != nil {
b.Fatal(err)
}
}
})
})
}
})
}
}
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Reque
ba.Timestamp = repl.Clock().Now()
ba.Add(req)
ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration)
// Tag the breaker with the request. Once Send returns, we'll check that it's
// no longer tracked by the breaker. This gives good coverage that we're not
// going to leak memory.
ctx = context.WithValue(ctx, req, struct{}{})

defer cancel()
_, pErr := repl.Send(ctx, ba)
// If our context got canceled, return an opaque error regardless of presence or
Expand All @@ -548,6 +553,20 @@ func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Reque
if err := ctx.Err(); err != nil {
pErr = roachpb.NewErrorf("timed out waiting for batch response: %v", pErr)
}
{
var err error
repl.VisitBreakerContexts(func(ctx context.Context) {
if err == nil && ctx.Value(req) != nil {
err = errors.Errorf(
"request %s returned but context still tracked in breaker", req,
)
}
})
if err != nil {
pErr = roachpb.NewErrorf("%s; after %v", err, pErr)
}
}

return pErr.GoError()
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ func (r *Replica) Breaker() *circuit2.Breaker {
return r.breaker.wrapped
}

func (r *Replica) VisitBreakerContexts(fn func(ctx context.Context)) {
r.breaker.visitCancels(fn)
}

func (r *Replica) AssertState(ctx context.Context, reader storage.Reader) {
r.raftMu.Lock()
defer r.raftMu.Unlock()
Expand Down
145 changes: 142 additions & 3 deletions pkg/kv/kvserver/replica_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver

import (
"context"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
Expand Down Expand Up @@ -61,12 +62,141 @@ type replicaCircuitBreaker struct {
stopper *stop.Stopper
r replicaInCircuitBreaker
st *cluster.Settings
cancels CancelStorage
wrapped *circuit.Breaker

versionIsActive int32 // atomic
}

// Register takes a cancelable context and its cancel function, and registers
// them with the circuit breaker. If the breaker is already tripped, its error
// is returned immediately and the caller should not continue processing the
// request. Otherwise, the cancel function is invoked when the breaker trips.
// The caller is provided with a token and signaller for use in a call to
// Unregister upon request completion. That method also takes the error (if any)
// resulting from the request to ensure that in the case of a tripped breaker,
// the error reflects this fact.
func (br *replicaCircuitBreaker) Register(
ctx context.Context, cancel func(),
) (_token interface{}, _ signaller, _ error) {
brSig := br.Signal()

if brSig.C() == nil {
// Circuit breakers are disabled, so don't do any work registering the
// context. Unregister will know that we didn't since it checks the
// same brSig.
return ctx, brSig, nil
}

// TODO(tbg): we may want to exclude more requests from this check, or allow
// requests to exclude themselves from the check (via their header). This
// latter mechanism could also replace isCircuitBreakerProbe.
if isCircuitBreakerProbe(ctx) {
brSig = neverTripSignaller{}
}

// NB: it might be tempting to check the breaker error first to avoid the call
// to Set below if the breaker is tripped at this point. However, the ordering
// here, subtly, is required to avoid situations in which the cancel is still
// in the map despite the probe having shut down (in which case cancel will
// not be invoked until the probe is next triggered, which maybe "never").
//
// To see this, consider the case in which the breaker is initially not
// tripped when we check, but then trips immediately and has the probe fail
// (and terminate). Since the probe is in charge of cancelling all tracked
// requests, we must ensure that this probe sees our request. Adding the
// request prior to calling Signal() means that if we see an untripped
// breaker, no probe is running - consequently should the breaker then trip,
// it will observe our cancel, thus avoiding a leak. If we observe a tripped
// breaker, we also need to remove our own cancel, as the probe may already
// have passed the point at which it iterates through the cancels prior to us
// inserting it. The cancel may be invoked twice, but that's ok.
//
// See TestReplicaCircuitBreaker_NoCancelRace.
tok := br.cancels.Set(ctx, cancel)
if err := brSig.Err(); err != nil {
br.cancels.Del(tok)
cancel()
return nil, nil, err
}

return tok, brSig, nil
}

// Unregister releases a tracked cancel function upon request completion.
// The error resulting from the request is passed in to allow decorating
// it in case the breaker tripped while the request was in-flight.
//
// See Register.
func (br *replicaCircuitBreaker) Unregister(
tok interface{}, sig signaller, pErr *roachpb.Error,
) *roachpb.Error {
brErr := sig.Err()
if sig.C() == nil {
// Breakers were disabled and we never put the cancel in the registry.
return pErr
}

br.cancels.Del(tok)

if pErr == nil {
return nil
}

err := pErr.GoError()
if ae := (&roachpb.AmbiguousResultError{}); errors.As(err, &ae) {
// The breaker tripped while a command was inflight, so we have to
// propagate an ambiguous result. We don't want to replace it, but there
// is a way to stash an Error in it so we use that.
//
// TODO(tbg): could also wrap it; there is no other write to WrappedErr
// in the codebase and it might be better to remove it. Nested *Errors
// are not a good idea.
wrappedErr := brErr
if ae.WrappedErr != nil {
wrappedErr = errors.Wrapf(brErr, "%v", ae.WrappedErr)
}
ae.WrappedErr = roachpb.NewError(wrappedErr)
return roachpb.NewError(ae)
} else if le := (&roachpb.NotLeaseHolderError{}); errors.As(err, &le) {
// When a lease acquisition triggered by this request is short-circuited
// by the breaker, it will return an opaque NotLeaseholderError, which we
// replace with the breaker's error.
return roachpb.NewError(errors.CombineErrors(brErr, le))
}
return pErr
}

func (br *replicaCircuitBreaker) visitCancels(f func(context.Context)) {
br.cancels.Visit(func(ctx context.Context, _ func()) (remove bool) {
f(ctx)
return false // keep
})
}

func (br *replicaCircuitBreaker) cancelAllTrackedContexts() {
br.cancels.Visit(func(ctx context.Context, cancel func()) (remove bool) {
cancel()
return true // remove
})
}

func (br *replicaCircuitBreaker) canEnable() bool {
b := atomic.LoadInt32(&br.versionIsActive) == 1
if b {
return true // fast path
}
// IsActive is mildly expensive since it has to unmarshal
// a protobuf.
if br.st.Version.IsActive(context.Background(), clusterversion.ProbeRequest) {
atomic.StoreInt32(&br.versionIsActive, 1)
return true
}
return false // slow path
}

func (br *replicaCircuitBreaker) enabled() bool {
return replicaCircuitBreakerSlowReplicationThreshold.Get(&br.st.SV) > 0 &&
br.st.Version.IsActive(context.Background(), clusterversion.ProbeRequest)
return replicaCircuitBreakerSlowReplicationThreshold.Get(&br.st.SV) > 0 && br.canEnable()
}

func (br *replicaCircuitBreaker) newError() error {
Expand Down Expand Up @@ -108,6 +238,7 @@ func newReplicaCircuitBreaker(
stopper *stop.Stopper,
ambientCtx log.AmbientContext,
r replicaInCircuitBreaker,
s CancelStorage,
onTrip func(),
onReset func(),
) *replicaCircuitBreaker {
Expand All @@ -117,7 +248,8 @@ func newReplicaCircuitBreaker(
r: r,
st: cs,
}

br.cancels = s
br.cancels.Reset()
br.wrapped = circuit.NewBreaker(circuit.Options{
Name: "breaker", // log bridge has ctx tags
AsyncProbe: br.asyncProbe,
Expand Down Expand Up @@ -173,6 +305,13 @@ func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) {
return
}

// First, tell all current requests to fail fast. Note that clients insert
// first, then check the breaker (and remove themselves if breaker already
// tripped then). This prevents any cancels from sneaking in after the probe
// gets past this point, which could otherwise leave cancels hanging until
// "something" triggers the next probe (which may be never if no more traffic
// arrives at the Replica). See Register.
br.cancelAllTrackedContexts()
err := sendProbe(ctx, br.r)
report(err)
}); err != nil {
Expand Down
Loading

0 comments on commit 412a65a

Please sign in to comment.