Skip to content

Commit

Permalink
kv: circuit-break requests to unavailable replicas
Browse files Browse the repository at this point in the history
Fixes #33007.
Closes #61311.

This PR introduces a new circuit breaker package that was first
prototyped in #70485. These circuit breakers never recruit regular
requests to do the probing but instead have a configurable probe
attached that determines when the breaker untrips. (It can be tripped
proactively or by client traffic, similar to the old breaker).

They are then used to address #33007: when a replica becomes
unavailable, it should eagerly refuse traffic that it believes
would simply hang.

Concretely, whenever a request (a lease acquisition attempt or
a replicated write) does not manage to replicate within
`base.SlowRequestThreshold` (15s at time of writing), the
breaker is tripped. The corresponding probe uses a newly
introduced `NoopWrite` which is a writing request that does
not mutate state but which always goes through the replication
layer and which gets to bypass the lease.

TODO (generally pulling sizeable chunks out into their own PRs and
landing them in some good order):

- [ ] rewrite circuit breaker internals to avoid all of the `unsafe`
- [ ] make base.SlowRequestThreshold overridable via TestingKnob
- [ ] add end-to-end test using TestCluster verifying the tripping and
      fail-fast behavior under various unavailability conditions (for
      example blocking during evaluation, or making the liveness range
      unavailable).
- [ ] add version gate for NoopWriteRequest (own PR)
- [ ] add targeted tests for NoopWriteRequest (in PR above)
- [ ] add cluster setting to disable breakers
- [ ] introduce a structured error for circuit breaker failures and file
      issue for SQL Observability to render this error nicely
      (translating table names, etc)
- [ ] Make sure the breaker also trips on pipelined writes.
- [ ] address, file issues for, or explicitly discard any inline TODOs
      added in the diff.
- [ ] write the final release note.

Release note (ops change): TODO
  • Loading branch information
tbg committed Nov 8, 2021
1 parent fac33a5 commit 8d559f0
Show file tree
Hide file tree
Showing 20 changed files with 1,968 additions and 858 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ go_library(
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/bufalloc",
"//pkg/util/circuit",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
Expand Down
36 changes: 36 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_noop_write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2014 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
)

func init() {
RegisterReadWriteCommand(roachpb.NoopWrite, DefaultDeclareKeys, NoopWrite)
}

// NoopWrite causes an effectless round-trip through the replication layer,
// i.e. it is a write that does not change any kv pair.
func NoopWrite(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {
return result.Result{
Replicated: kvserverpb.ReplicatedEvalResult{
IsNoopWrite: true,
},
}, nil
}
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ func (p *Result) MergeAndDestroy(q Result) error {
}
q.Replicated.PriorReadSummary = nil

if !p.Replicated.IsNoopWrite {
p.Replicated.IsNoopWrite = q.Replicated.IsNoopWrite
}
q.Replicated.IsNoopWrite = false

if p.Local.EncounteredIntents == nil {
p.Local.EncounteredIntents = q.Local.EncounteredIntents
} else {
Expand Down
280 changes: 280 additions & 0 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test

import (
"context"
"sync/atomic"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestReplicaCircuitBreaker(t *testing.T) {
defer leaktest.AfterTest(t)
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := setupCircuitBreakerTest(t)
defer tc.Stopper().Stop(ctx)

const (
n1 = 0
n2 = 1
)

runCircuitBreakerTest(t, "breaker-ok", func(t *testing.T, ctx context.Context, tc *circuitBreakerTest) {
// Circuit breaker doesn't get in the way of anything unless
// something trips it.
require.NoError(t, tc.Write(n1))
tc.RequireIsNotLeaseholderError(t, tc.Write(n2))
require.NoError(t, tc.Read(n1))
tc.RequireIsNotLeaseholderError(t, tc.Read(n2))
})

runCircuitBreakerTest(t, "leaseholder-tripped", func(t *testing.T, ctx context.Context, tc *circuitBreakerTest) {
// Get lease on n1.
require.NoError(t, tc.Write(n1))
// Disable the probe so that when the breaker trips, it stays that tripped.
tc.SetProbeEnabled(n1, false)
tc.Report(n1, errors.New("boom"))

// n1, despite the tripped probe, can still serve reads as long as they
// are valid under the lease. But writes fail fast.
require.NoError(t, tc.Read(n1))
tc.RequireIsBreakerOpen(t, tc.Write(n1))

// n2 does not have the lease so all it does is redirect to the leaseholder
// n1.
tc.RequireIsNotLeaseholderError(t, tc.Read(n2))
tc.RequireIsNotLeaseholderError(t, tc.Write(n2))

// Enable the probe. Even a read should trigger the probe
// and within due time the breaker should heal.
tc.SetProbeEnabled(n1, true)
tc.UntripsSoon(t, tc.Read, n1)
// Same behavior on writes.
tc.Report(n1, errors.New("boom again"))
tc.UntripsSoon(t, tc.Write, n1)
})

runCircuitBreakerTest(t, "leaseless-tripped", func(t *testing.T, ctx context.Context, tc *circuitBreakerTest) {
// Put the lease on n1 but then trip the breaker with the probe
// disabled.
require.NoError(t, tc.Write(n1))
tc.SetProbeEnabled(n1, false)
tc.Report(n1, errors.New("boom"))
resumeHeartbeats := tc.PauseHeartbeatsAndExpireAllLeases(t)

// n2 (not n1) will return a NotLeaseholderError. This may be surprising -
// why isn't it trying and succeeding to acquire a lease - but it does
// not do that because it sees that the new leaseholder (n2) is not live
// itself. We'll revisit this after re-enabling liveness later in the test.
{
err := tc.Read(n2)
// At time of writing: not incrementing epoch on n1 because next
// leaseholder (n2) not live.
t.Log(err)
tc.RequireIsNotLeaseholderError(t, err)
// Same behavior for write on n2.
tc.RequireIsNotLeaseholderError(t, tc.Write(n2))
}
// On n1, run into the circuit breaker when requesting lease.
{
tc.RequireIsBreakerOpen(t, tc.Read(n1))
tc.RequireIsBreakerOpen(t, tc.Write(n1))
}

// Let the breaker heal and things should go back to normal. This is not a
// trivial thing to hold, as the probe needs to go through for this, and if
// we're not careful, the probe itself is held up by the breaker as well.
// Since the probe leads to a lease acquisition and the lease acquisition is
// fairly disjoint from the request that triggered it, there is custom code
// to make this work.
resumeHeartbeats()
tc.SetProbeEnabled(n1, true)
tc.UntripsSoon(t, tc.Read, n1)
tc.UntripsSoon(t, tc.Write, n1)
tc.RequireIsNotLeaseholderError(t, tc.Read(n2))
tc.RequireIsNotLeaseholderError(t, tc.Write(n2))
})
}

// Test infrastructure below.

func makeBreakerToggleable(b *circuit.Breaker) (setProbeEnabled func(bool)) {
opts := b.Opts()
origProbe := opts.AsyncProbe
var disableProbe int32
opts.AsyncProbe = func(report func(error), done func()) {
if atomic.LoadInt32(&disableProbe) == 1 {
done()
return
}
origProbe(report, done)
}
b.Reconfigure(opts)
return func(to bool) {
var n int32
if !to {
n = 1
}
atomic.StoreInt32(&disableProbe, n)
}
}

type replWithKnob struct {
*kvserver.Replica
setProbeEnabled func(bool)
}

type circuitBreakerTest struct {
*testcluster.TestCluster
ManualClock *hlc.HybridManualClock
repls []replWithKnob // 0 -> repl on Servers[0], etc
}

func runCircuitBreakerTest(
t *testing.T, name string, f func(*testing.T, context.Context, *circuitBreakerTest),
) {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 4*testutils.DefaultSucceedsSoonDuration)
defer cancel()
tc := setupCircuitBreakerTest(t)
defer tc.Stopper().Stop(ctx)
f(t, ctx, tc)
})
}

func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest {
manualClock := hlc.NewHybridManualClock()
args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manualClock.UnixNano,
},
},
},
}
tc := testcluster.StartTestCluster(t, 2, args)

k := tc.ScratchRange(t)

tc.AddVotersOrFatal(t, k, tc.Target(1))

var repls []replWithKnob
for i := range tc.Servers {
repl := tc.GetFirstStoreFromServer(t, i).LookupReplica(keys.MustAddr(k))
enableProbe := makeBreakerToggleable(repl.Breaker())
repls = append(repls, replWithKnob{repl, enableProbe})
}
return &circuitBreakerTest{
TestCluster: tc,
ManualClock: manualClock,
repls: repls,
}
}

func (cbt *circuitBreakerTest) SetProbeEnabled(idx int, to bool) {
cbt.repls[idx].setProbeEnabled(to)
}

func (cbt *circuitBreakerTest) Report(idx int, err error) {
cbt.repls[idx].Replica.Breaker().Report(err)
}

func (cbt *circuitBreakerTest) UntripsSoon(t *testing.T, method func(idx int) error, idx int) {
testutils.SucceedsSoon(t, func() error {
err := method(idx)
// All errors coming out should be annotated as coming from
// the circuit breaker.
if err != nil && !errors.Is(err, circuit.ErrBreakerOpen()) {
t.Errorf("saw unexpected error %+v", err)
}
return err
})
}

func (cbt *circuitBreakerTest) PauseHeartbeatsAndExpireAllLeases(t *testing.T) (undo func()) {
var maxWT int64
var fs []func()
for _, srv := range cbt.Servers {
lv := srv.NodeLiveness().(*liveness.NodeLiveness)
undo := lv.PauseAllHeartbeatsForTest()
fs = append(fs, undo)
self, ok := lv.Self()
require.True(t, ok)
if maxWT < self.Expiration.WallTime {
maxWT = self.Expiration.WallTime
}
}
cbt.ManualClock.Forward(maxWT + 1)
return func() {
for _, f := range fs {
f()
}
}
}

func (*circuitBreakerTest) sendBatchRequest(repl *kvserver.Replica, req roachpb.Request) error {
var ba roachpb.BatchRequest
ba.RangeID = repl.Desc().RangeID
ba.Timestamp = repl.Clock().Now()
ba.Add(req)
ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration)
defer cancel()
_, pErr := repl.Send(ctx, ba)
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "timed out waiting for batch response")
}
return pErr.GoError()
}

func (*circuitBreakerTest) RequireIsBreakerOpen(t *testing.T, err error) {
require.True(t, errors.Is(err, circuit.ErrBreakerOpen()), "%+v", err)
}

func (*circuitBreakerTest) RequireIsNotLeaseholderError(t *testing.T, err error) {
_, ok := err.(*roachpb.NotLeaseHolderError)
require.True(t, ok, "%+v", err)
}

func (cbt *circuitBreakerTest) Write(idx int) error {
return cbt.writeViaRepl(cbt.repls[idx].Replica)
}

func (cbt *circuitBreakerTest) Read(idx int) error {
return cbt.readViaRepl(cbt.repls[idx].Replica)
}

func (cbt *circuitBreakerTest) writeViaRepl(repl *kvserver.Replica) error {
put := roachpb.NewPut(repl.Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello"))
return cbt.sendBatchRequest(repl, put)
}

func (cbt *circuitBreakerTest) readViaRepl(repl *kvserver.Replica) error {
get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */)
return cbt.sendBatchRequest(repl, get)
}
Loading

0 comments on commit 8d559f0

Please sign in to comment.