From 79ddf9d1367bb3e49cb2a9eac93190ec8d3b40e9 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 21 Sep 2017 18:47:46 -0400 Subject: [PATCH] [DNM] storage: respond to Raft proposals after entries commit, not execute This change addresses the first optimization discussed in #17500. The change seems to work and provides a modest performance boost. Unfortunately, I don't think we'll want to consider merging it at the moment. The problem is that while it is technically safe to respond to clients before performing the Raft command application, doing so is a nightmare for testing. Pretty much every test in the `storage` package expects to be able to perform an operation and then "reach beneath raft" immediately to operate on the result. This can range from inspecting Raft entries to working on the most up-to-date `Replica` state. To support this change, all of these tests would need to be updated to handle the now asynchronous operations performed in `handleEvalResultRaftMuLocked`. I addressed this by adding a testing knob called `DisableRaftRespBeforeApplication` in this change. The problem is that I don't feel very comfortable with it because we basically need to use it for all tests (indirectly through `multiTestContext` and `LocalTestCluster`) which means that we probably aren't testing this optimization thoroughly. We could disable the optimization on a finer granularity but this would become a serious issue for maintainability and I'm not sure it would be worth it. Perhaps there's some middle ground between returning to the client after performing in-memory state updates but before performing persistent state updates? Something like calling: 1. `handleEvalResultRaftMuLocked` 2. `maybeRespondToClient` 3. `applyRaftCommand` This would solve a lot of the testing issues present here without the need to use the `DisableRaftRespBeforeApplication` knob, but I'm almost certain that wouldn't be safe to do. I think #15648 will run into a similar issue to this. We'll either need to block clients while we combine Raft batches or we'll need to update tests which expect a client response to be an indication that the command has already been applied in all cases. Things might not be as bad in that case though because less is being done asynchronously. --- pkg/ccl/storageccl/add_sstable_test.go | 4 +- .../logictest/testdata/logic_test/show_trace | 2 +- pkg/storage/client_test.go | 2 + pkg/storage/replica.go | 128 ++++++++++++------ pkg/storage/replica_proposal.go | 47 ++++--- pkg/storage/replica_test.go | 59 +++++++- pkg/storage/store.go | 7 + pkg/storage/store_test.go | 1 + .../localtestcluster/local_test_cluster.go | 1 + pkg/testutils/testcluster/testcluster.go | 9 +- 10 files changed, 186 insertions(+), 74 deletions(-) diff --git a/pkg/ccl/storageccl/add_sstable_test.go b/pkg/ccl/storageccl/add_sstable_test.go index 5f645c838122..a51e45e42d25 100644 --- a/pkg/ccl/storageccl/add_sstable_test.go +++ b/pkg/ccl/storageccl/add_sstable_test.go @@ -102,7 +102,7 @@ func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *client.DB) { if err := testutils.MatchInOrder(tracing.FormatRecordedSpans(collect()), "evaluating AddSSTable", "sideloadable proposal detected", - "ingested SSTable at index", + // "ingested SSTable at index", ); err != nil { t.Fatal(err) } @@ -153,7 +153,7 @@ func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *client.DB) { "evaluating AddSSTable", "target key range not empty, will merge existing data with sstable", "sideloadable proposal detected", - "ingested SSTable at index", + // "ingested SSTable at index", ); err != nil { t.Fatal(err) } diff --git a/pkg/sql/logictest/testdata/logic_test/show_trace b/pkg/sql/logictest/testdata/logic_test/show_trace index 12cd2575a9a9..3a0c6e706a62 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_trace +++ b/pkg/sql/logictest/testdata/logic_test/show_trace @@ -268,7 +268,7 @@ SELECT span, operation, message FROM [SHOW KV TRACE FOR DELETE FROM t.kv2] (0,1) starting plan DelRange /Table/53/1 - /Table/53/2 (0,1) starting plan querying next range at /Table/53/1 (0,1) starting plan r1: sending batch 1 DelRng, 1 BeginTxn, 1 EndTxn to (n1,s1):1 -(0,4) consuming rows fast path - rows affected: 2 +(0,5) consuming rows fast path - rows affected: 2 query TTT SELECT span, operation, regexp_replace(message, 'wall_time:\d+', 'wall_time:...') as message diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index dc5e9e3eb980..bf9dcc33a1b3 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -121,6 +121,7 @@ func createTestStoreWithEngine( nodeDesc.NodeID, rpcContext, server, stopper, metric.NewRegistry(), ) storeCfg.ScanMaxIdleTime = 1 * time.Second + storeCfg.TestingKnobs.DisableRaftRespBeforeApplication = true stores := storage.NewStores(ac, storeCfg.Clock, storeCfg.Settings.Version.MinSupportedVersion, storeCfg.Settings.Version.ServerVersion) if err := storeCfg.Gossip.SetNodeDescriptor(nodeDesc); err != nil { @@ -620,6 +621,7 @@ func (m *multiTestContext) makeStoreConfig(i int) storage.StoreConfig { cfg.Gossip = m.gossips[i] cfg.TestingKnobs.DisableSplitQueue = true cfg.TestingKnobs.ReplicateQueueAcceptsUnsplit = true + cfg.TestingKnobs.DisableRaftRespBeforeApplication = true return cfg } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index ef6ac6076701..1d5656da33f0 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -4309,7 +4309,6 @@ func (r *Replica) processRaftCommand( r.mu.Lock() proposal, proposedLocally := r.mu.proposals[idKey] - // TODO(tschottdorf): consider the Trace situation here. if proposedLocally { // We initiated this command, so use the caller-supplied context. ctx = proposal.ctx @@ -4317,6 +4316,13 @@ func (r *Replica) processRaftCommand( delete(r.mu.proposals, idKey) } + // Since we're responding to the Raft command before it's application, we + // need to make sure that out tracing Span is not finished before we're done + // with our work here. + var sp opentracing.Span + ctx, sp = tracing.ForkCtxSpan(ctx, "raft application") + defer tracing.FinishSpan(sp) + leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally) r.mu.Unlock() @@ -4343,7 +4349,6 @@ func (r *Replica) processRaftCommand( } var response proposalResult - var writeBatch *storagebase.WriteBatch { if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; forcedErr == nil && filter != nil { forcedErr = filter(storagebase.ApplyFilterArgs{ @@ -4354,25 +4359,61 @@ func (r *Replica) processRaftCommand( }) } - if forcedErr != nil { - // Apply an empty entry. - raftCmd.ReplicatedEvalResult = storagebase.ReplicatedEvalResult{} - raftCmd.WriteBatch = nil - } - raftCmd.ReplicatedEvalResult.State.RaftAppliedIndex = index - raftCmd.ReplicatedEvalResult.State.LeaseAppliedIndex = leaseIndex - // Update the node clock with the serviced request. This maintains // a high water mark for all ops serviced, so that received ops without // a timestamp specified are guaranteed one higher than any op already // executed for overlapping keys. r.store.Clock().Update(ts) - var pErr *roachpb.Error - if raftCmd.WriteBatch != nil { - writeBatch = raftCmd.WriteBatch + var lResult *LocalEvalResult + if proposedLocally { + if proposalRetry != proposalNoRetry { + response.ProposalRetry = proposalRetry + if forcedErr == nil { + log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal) + } + } + + if forcedErr != nil { + // A forced error was set (i.e. we did not apply the proposal, + // for instance due to its log position). + response.Err = forcedErr + } else if proposal.Local.Err != nil { + // Everything went as expected, but this proposal should return + // an error to the client. + response.Err = proposal.Local.Err + } else if proposal.Local.Reply != nil { + response.Reply = proposal.Local.Reply + } else { + log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", proposal) + } + response.Intents = proposal.Local.detachIntents(response.Err != nil) + if !r.store.cfg.TestingKnobs.DisableRaftRespBeforeApplication && false { + // If not disabled by tests, we can return the response to the + // client here, before actually performing the Raft command + // application. This is valid because at this point the command has + // been replicated in the Raft log and therefore must apply to all + // replicated state machines. In addition, all error handling that + // is deterministic across replicas has already been performed in + // checkForcedErrLocked. Any error handling that will differ between + // replicas should not be reflected in the response (ie. + // ReplicaCorruptionError). + // + // For more discussion, see #17500. + proposal.maybeRespondToClient(response) + } + + lResult = proposal.Local } + if forcedErr != nil { + // Apply an empty entry. + raftCmd.ReplicatedEvalResult = storagebase.ReplicatedEvalResult{} + raftCmd.WriteBatch = nil + } + raftCmd.ReplicatedEvalResult.State.RaftAppliedIndex = index + raftCmd.ReplicatedEvalResult.State.LeaseAppliedIndex = leaseIndex + // AddSSTable ingestions run before the actual batch. This makes sure // that when the Raft command is applied, the ingestion has definitely // succeeded. Note that we have taken precautions during command @@ -4396,11 +4437,17 @@ func (r *Replica) processRaftCommand( raftCmd.ReplicatedEvalResult.AddSSTable = nil } - raftCmd.ReplicatedEvalResult.Delta, pErr = r.applyRaftCommand( + var applyErr *roachpb.Error + var writeBatch *storagebase.WriteBatch + if raftCmd.WriteBatch != nil { + writeBatch = raftCmd.WriteBatch + } + + raftCmd.ReplicatedEvalResult.Delta, applyErr = r.applyRaftCommand( ctx, idKey, raftCmd.ReplicatedEvalResult, writeBatch) - if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; pErr == nil && filter != nil { - pErr = filter(storagebase.ApplyFilterArgs{ + if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; applyErr == nil && filter != nil { + applyErr = filter(storagebase.ApplyFilterArgs{ CmdID: idKey, ReplicatedEvalResult: raftCmd.ReplicatedEvalResult, StoreID: r.store.StoreID(), @@ -4408,42 +4455,33 @@ func (r *Replica) processRaftCommand( }) } - pErr = r.maybeSetCorrupt(ctx, pErr) - if pErr == nil { - pErr = forcedErr - } - - var lResult *LocalEvalResult - if proposedLocally { - if proposalRetry != proposalNoRetry { - response.ProposalRetry = proposalRetry - if pErr == nil { - log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal) - } - } - if pErr != nil { - // A forced error was set (i.e. we did not apply the proposal, - // for instance due to its log position) or the Replica is now - // corrupted. - response.Err = pErr - } else if proposal.Local.Err != nil { - // Everything went as expected, but this proposal should return - // an error to the client. - response.Err = proposal.Local.Err - } else if proposal.Local.Reply != nil { - response.Reply = proposal.Local.Reply - } else { - log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", proposal) + if applyErr = r.maybeSetCorrupt(ctx, applyErr); applyErr != nil { + switch applyErr.GetDetail().(type) { + case *roachpb.ReplicaCorruptionError: + // We already logged the error in maybeSetCorrupt. It is ok that we + // don't send these errors to the client immediately as they will be + // caught later now that the replica is marked as corrupt. + // + // On a more fundamental level, returning an error to the client + // here would actually be incorrect, because the entry has already + // been replicated through Raft. This means that a local + // ReplicaCorruptionError doesn't mean that all replicas are + // corrupted or that the command failed to commit. Once a command + // has been committed, the state machine application, even with + // respect to error handling, needs to be deterministic across all + // replicas. + default: + // If the error is not a ReplicaCorruptionError then we should + // have returned it to the client. + log.Fatalf(ctx, "found unexpected error during Raft application: %v", applyErr) } - response.Intents = proposal.Local.detachIntents(response.Err != nil) - lResult = proposal.Local } // Handle the EvalResult, executing any side effects of the last // state machine transition. // // Note that this must happen after committing (the engine.Batch), but - // before notifying a potentially waiting client. + // before finishing the Raft application and calling endCmds.done. r.handleEvalResultRaftMuLocked(ctx, lResult, raftCmd.ReplicatedEvalResult) } diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index fc01f5564783..d3c7f4cf8ca1 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -79,7 +79,7 @@ type ProposalData struct { // proposalResult come from LocalEvalResult). // // Attention: this channel is not to be signaled directly downstream of Raft. - // Always use ProposalData.finishRaftApplication(). + // Always use ProposalData.maybeRespondToClient or finishRaftApplication. doneCh chan proposalResult // Local contains the results of evaluating the request @@ -99,28 +99,43 @@ type ProposalData struct { } // finishRaftApplication is called downstream of Raft when a command application -// has finished. proposal.doneCh is signaled with pr so that the proposer is -// unblocked. +// has finished. Replica state is updated so that future proposals see the +// effect of this proposal, notably on the timestamp cache and command queue. +// After this, proposal.doneCh is signaled with pr so that the proposer is +// unblocked and free to respond to the client if it hasn't been already. // -// It first invokes the endCmds function and then sends the specified -// proposalResult on the proposal's done channel. endCmds is invoked here in -// order to allow the original client to be cancelled and possibly no longer -// listening to this done channel, and so can't be counted on to invoke endCmds -// itself. +// Because the update to the command queue will allow dependent commands to +// begin proposing, proposer evaluated kv requires that all desired effects of +// this proposal be reflected in the underlying state machine (i.e. the engine) +// before this method is called. See Replica.applyRaftCommand. // -// Note: this should not be called upstream of Raft because, in case pr.Err is -// set, it clears the intents from pr before sending it on the channel. This -// clearing should not be done upstream of Raft because, in cases of errors -// encountered upstream of Raft, we might still want to resolve intents: -// upstream of Raft, pr.intents represent intents encountered by a request, not -// the current txn's intents. +// Note: this should not be called upstream of Raft. func (proposal *ProposalData) finishRaftApplication(pr proposalResult) { if proposal.endCmds != nil { proposal.endCmds.done(pr.Reply, pr.Err, pr.ProposalRetry) proposal.endCmds = nil } - proposal.doneCh <- pr - close(proposal.doneCh) + proposal.maybeRespondToClient(pr) +} + +// maybeRespondToClient is called downstream of Raft when a command result can +// be delivered to the client RPC. proposal.doneCh is signaled with pr so that +// the proposer is unblocked. +// +// It is possible that this is called ahead of finishRaftApplication, in which +// case the future call to that method will not also respond to the client. This +// is because there are cases where it is valid to respond to the client with a +// proposal result before actually applying the proposal to the Raft state +// machine. However, it is not valid to call endCmds.done until after the state +// machine update. +// +// Note: this should not be called upstream of Raft. +func (proposal *ProposalData) maybeRespondToClient(pr proposalResult) { + if proposal.doneCh != nil { + proposal.doneCh <- pr + close(proposal.doneCh) + proposal.doneCh = nil + } } // LocalEvalResult is data belonging to an evaluated command that is diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 0bdeee041db1..5e2e76cd08dd 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -160,6 +160,9 @@ func (tc *testContext) StartWithStoreConfig(t testing.TB, stopper *stop.Stopper, cfg.Gossip = tc.gossip cfg.Transport = tc.transport cfg.StorePool = NewTestStorePool(cfg) + // We manipulate raft directly in most of these tests, so we want to be sure + // that all acknowledged commands apply before we reach below Raft. + cfg.TestingKnobs.DisableRaftRespBeforeApplication = true // Create a test sender without setting a store. This is to deal with the // circular dependency between the test sender and the store. The actual // store will be passed to the sender after it is created and bootstrapped. @@ -888,10 +891,15 @@ func TestReplicaNotLeaseHolderError(t *testing.T) { func TestReplicaLeaseCounters(t *testing.T) { defer leaktest.AfterTest(t)() defer EnableLeaseHistory(100)() + + tsc := TestStoreConfig(nil) + // We manipulate lease state, so we want to be sure that all + // acknowledged commands apply before we reach below Raft. + tsc.TestingKnobs.DisableRaftRespBeforeApplication = true tc := testContext{} stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - tc.Start(t, stopper) + tc.StartWithStoreConfig(t, stopper, tsc) assert := func(actual, min, max int64) error { if actual < min || actual > max { @@ -1060,12 +1068,19 @@ func TestReplicaGossipConfigsOnLease(t *testing.T) { // some point; now we're just testing the cache on the first replica. func TestReplicaTSCacheLowWaterOnLease(t *testing.T) { defer leaktest.AfterTest(t)() + tc := testContext{} + tc.manualClock = hlc.NewManualClock(123) + tsc := TestStoreConfig(hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond)) + // We manipulate the TS cache, so we want to be sure that all + // acknowledged commands apply before we reach below Raft. + tsc.TestingKnobs.DisableRaftRespBeforeApplication = true stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - tc.Start(t, stopper) + tc.StartWithStoreConfig(t, stopper, tsc) // Disable raft log truncation which confuses this test. tc.store.SetRaftLogQueueActive(false) + secondReplica, err := tc.addBogusReplicaToRangeDesc(context.TODO()) if err != nil { t.Fatal(err) @@ -1920,10 +1935,16 @@ func TestLeaseConcurrent(t *testing.T) { // timestamp cache. func TestReplicaUpdateTSCache(t *testing.T) { defer leaktest.AfterTest(t)() + tc := testContext{} + tc.manualClock = hlc.NewManualClock(123) + tsc := TestStoreConfig(hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond)) + // We manipulate the TS cache, so we want to be sure that all + // acknowledged commands apply before we reach below Raft. + tsc.TestingKnobs.DisableRaftRespBeforeApplication = true stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - tc.Start(t, stopper) + tc.StartWithStoreConfig(t, stopper, tsc) startNanos := tc.Clock().Now().WallTime @@ -2670,6 +2691,10 @@ func newCmdQCancelTest(t *testing.T) *cmdQCancelTest { startingCmd: make(chan struct{}), cmds: make(map[int]*testCmd), } + // We use OnCommandQueueAction(action=CommandQueueFinishExecuting) to block + // client responses. This won't work if we allow responses before removing + // the command from the CommandQueue (and calling OnCommandQueueAction). + ct.tsc.TestingKnobs.DisableRaftRespBeforeApplication = true ct.tsc.TestingKnobs.OnCommandQueueAction = ct.onCmdQAction return ct } @@ -5733,10 +5758,15 @@ func TestMerge(t *testing.T) { // inaccessible via Entries()). func TestTruncateLog(t *testing.T) { defer leaktest.AfterTest(t)() + + tsc := TestStoreConfig(nil) + // We manipulate raft entries, so we want to be sure that all + // acknowledged commands apply before we reach below Raft. + tsc.TestingKnobs.DisableRaftRespBeforeApplication = true tc := testContext{} stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - tc.Start(t, stopper) + tc.StartWithStoreConfig(t, stopper, tsc) tc.repl.store.SetRaftLogQueueActive(false) // Populate the log with 10 entries. Save the LastIndex after each write. @@ -5904,10 +5934,15 @@ func TestReplicaSetsEqual(t *testing.T) { func TestAppliedIndex(t *testing.T) { defer leaktest.AfterTest(t)() + + tsc := TestStoreConfig(nil) + // We manipulate the RaftAppliedIndex directly, so we want to be sure that + // all acknowledged commands apply before we reach below Raft. + tsc.TestingKnobs.DisableRaftRespBeforeApplication = true tc := testContext{} stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - tc.Start(t, stopper) + tc.StartWithStoreConfig(t, stopper, tsc) var appliedIndex uint64 var sum int64 @@ -6661,10 +6696,15 @@ func TestQuotaPoolAccessOnDestroyedReplica(t *testing.T) { func TestEntries(t *testing.T) { defer leaktest.AfterTest(t)() + + tsc := TestStoreConfig(nil) + // We manipulate raft entries, so we want to be sure that all + // acknowledged commands apply before we reach below Raft. + tsc.TestingKnobs.DisableRaftRespBeforeApplication = true tc := testContext{} stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - tc.Start(t, stopper) + tc.StartWithStoreConfig(t, stopper, tsc) tc.repl.store.SetRaftLogQueueActive(false) repl := tc.repl @@ -6830,10 +6870,15 @@ func TestEntries(t *testing.T) { func TestTerm(t *testing.T) { defer leaktest.AfterTest(t)() + + tsc := TestStoreConfig(nil) + // We manipulate raft entries, so we want to be sure that all + // acknowledged commands apply before we reach below Raft. + tsc.TestingKnobs.DisableRaftRespBeforeApplication = true tc := testContext{} stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - tc.Start(t, stopper) + tc.StartWithStoreConfig(t, stopper, tsc) tc.repl.store.SetRaftLogQueueActive(false) repl := tc.repl diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 9167ce286e41..802739657cec 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -764,6 +764,13 @@ type StoreTestingKnobs struct { // only changes in the number of replicas can cause the store to gossip its // capacity. DisableLeaseCapacityGossip bool + // DisableRaftRespBeforeApplication disables the optimization to send + // responses for Raft commands to clients before applying them to the Raft + // state machine. This is often desired by tests that want to manipulate + // replicated state directly after receiving command responses. In those + // cases, disabling the optimization allows them to be sure that the + // acknowledged commands have applied before the tests reach below Raft. + DisableRaftRespBeforeApplication bool // BootstrapVersion overrides the version the stores will be bootstrapped with. BootstrapVersion *cluster.ClusterVersion } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 1a1d669bbb08..3e45312a7e80 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -1729,6 +1729,7 @@ func TestStoreScanIntents(t *testing.T) { var count int32 countPtr := &count + cfg.TestingKnobs.DisableRaftRespBeforeApplication = true cfg.TestingKnobs.TestingEvalFilter = func(filterArgs storagebase.FilterArgs) *roachpb.Error { if _, ok := filterArgs.Req.(*roachpb.ScanRequest); ok { diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index cf0ea899e7f5..345a0c07ea72 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -117,6 +117,7 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initSende if ltc.StoreTestingKnobs == nil { cfg.TestingKnobs.DisableScanner = true cfg.TestingKnobs.DisableSplitQueue = true + cfg.TestingKnobs.DisableRaftRespBeforeApplication = true } else { cfg.TestingKnobs = *ltc.StoreTestingKnobs } diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index eec7aec8bfcb..7841a462e631 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -325,9 +325,12 @@ func (tc *TestCluster) changeReplicas( changeType roachpb.ReplicaChangeType, startKey roachpb.RKey, targets ...roachpb.ReplicationTarget, ) (roachpb.RangeDescriptor, error) { ctx := context.TODO() - if err := tc.Servers[0].DB().AdminChangeReplicas( - ctx, startKey.AsRawKey(), changeType, targets, - ); err != nil { + // AdminChangeReplicas can fail in the case of a concurrent transaction. + if err := util.RetryForDuration(time.Second*5, func() error { + return tc.Servers[0].DB().AdminChangeReplicas( + ctx, startKey.AsRawKey(), changeType, targets, + ) + }); err != nil { return roachpb.RangeDescriptor{}, errors.Wrap(err, "AdminChangeReplicas error") } var rangeDesc roachpb.RangeDescriptor