Skip to content

Commit

Permalink
[DNM] storage: respond to Raft proposals after entries commit, not ex…
Browse files Browse the repository at this point in the history
…ecute

This change addresses the first optimization discussed in cockroachdb#17500.

The change seems to work and provides a modest performance boost.
Unfortunately, I don't think we'll want to consider merging it at
the moment. The problem is that while it is technically safe to
respond to clients before performing the Raft command application,
doing so is a nightmare for testing. Pretty much every
test in the `storage` package expects to be able to perform an
operation and then "reach beneath raft" immediately to operate
on the result. This can range from inspecting Raft entries to
working on the most up-to-date `Replica` state.

To support this change, all of these tests would need to be
updated to handle the now asynchronous operations performed
in `handleEvalResultRaftMuLocked`. I addressed this by adding
a testing knob called `DisableRaftRespBeforeApplication` in
this change. The problem is that I don't feel very comfortable
with it because we basically need to use it for all tests
(indirectly through `multiTestContext` and `LocalTestCluster`)
which means that we probably aren't testing this optimization
thoroughly. We could disable the optimization on a finer
granularity but this would become a serious issue for
maintainability and I'm not sure it would be worth it.

Perhaps there's some middle ground between returning to the
client after performing in-memory state updates but before
performing persistent state updates? Something like calling:
1. `handleEvalResultRaftMuLocked`
2. `maybeRespondToClient`
3. `applyRaftCommand`

This would solve a lot of the testing issues present here without
the need to use the `DisableRaftRespBeforeApplication` knob, but
I'm almost certain that wouldn't be safe to do.

I think cockroachdb#15648 will run into a similar issue to this. We'll either
need to block clients while we combine Raft batches or we'll need
to update tests which expect a client response to be an indication
that the command has already been applied in all cases. Things
might not be as bad in that case though because less is being
done asynchronously.
  • Loading branch information
nvanbenschoten committed Sep 23, 2017
1 parent 241b883 commit 79ddf9d
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 74 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *client.DB) {
if err := testutils.MatchInOrder(tracing.FormatRecordedSpans(collect()),
"evaluating AddSSTable",
"sideloadable proposal detected",
"ingested SSTable at index",
// "ingested SSTable at index",
); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *client.DB) {
"evaluating AddSSTable",
"target key range not empty, will merge existing data with sstable",
"sideloadable proposal detected",
"ingested SSTable at index",
// "ingested SSTable at index",
); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/show_trace
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ SELECT span, operation, message FROM [SHOW KV TRACE FOR DELETE FROM t.kv2]
(0,1) starting plan DelRange /Table/53/1 - /Table/53/2
(0,1) starting plan querying next range at /Table/53/1
(0,1) starting plan r1: sending batch 1 DelRng, 1 BeginTxn, 1 EndTxn to (n1,s1):1
(0,4) consuming rows fast path - rows affected: 2
(0,5) consuming rows fast path - rows affected: 2

query TTT
SELECT span, operation, regexp_replace(message, 'wall_time:\d+', 'wall_time:...') as message
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func createTestStoreWithEngine(
nodeDesc.NodeID, rpcContext, server, stopper, metric.NewRegistry(),
)
storeCfg.ScanMaxIdleTime = 1 * time.Second
storeCfg.TestingKnobs.DisableRaftRespBeforeApplication = true
stores := storage.NewStores(ac, storeCfg.Clock, storeCfg.Settings.Version.MinSupportedVersion, storeCfg.Settings.Version.ServerVersion)

if err := storeCfg.Gossip.SetNodeDescriptor(nodeDesc); err != nil {
Expand Down Expand Up @@ -620,6 +621,7 @@ func (m *multiTestContext) makeStoreConfig(i int) storage.StoreConfig {
cfg.Gossip = m.gossips[i]
cfg.TestingKnobs.DisableSplitQueue = true
cfg.TestingKnobs.ReplicateQueueAcceptsUnsplit = true
cfg.TestingKnobs.DisableRaftRespBeforeApplication = true
return cfg
}

Expand Down
128 changes: 83 additions & 45 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -4309,14 +4309,20 @@ func (r *Replica) processRaftCommand(
r.mu.Lock()
proposal, proposedLocally := r.mu.proposals[idKey]

// TODO(tschottdorf): consider the Trace situation here.
if proposedLocally {
// We initiated this command, so use the caller-supplied context.
ctx = proposal.ctx
proposal.ctx = nil // avoid confusion
delete(r.mu.proposals, idKey)
}

// Since we're responding to the Raft command before it's application, we
// need to make sure that out tracing Span is not finished before we're done
// with our work here.
var sp opentracing.Span
ctx, sp = tracing.ForkCtxSpan(ctx, "raft application")
defer tracing.FinishSpan(sp)

leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally)

r.mu.Unlock()
Expand All @@ -4343,7 +4349,6 @@ func (r *Replica) processRaftCommand(
}

var response proposalResult
var writeBatch *storagebase.WriteBatch
{
if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; forcedErr == nil && filter != nil {
forcedErr = filter(storagebase.ApplyFilterArgs{
Expand All @@ -4354,25 +4359,61 @@ func (r *Replica) processRaftCommand(
})
}

if forcedErr != nil {
// Apply an empty entry.
raftCmd.ReplicatedEvalResult = storagebase.ReplicatedEvalResult{}
raftCmd.WriteBatch = nil
}
raftCmd.ReplicatedEvalResult.State.RaftAppliedIndex = index
raftCmd.ReplicatedEvalResult.State.LeaseAppliedIndex = leaseIndex

// Update the node clock with the serviced request. This maintains
// a high water mark for all ops serviced, so that received ops without
// a timestamp specified are guaranteed one higher than any op already
// executed for overlapping keys.
r.store.Clock().Update(ts)

var pErr *roachpb.Error
if raftCmd.WriteBatch != nil {
writeBatch = raftCmd.WriteBatch
var lResult *LocalEvalResult
if proposedLocally {
if proposalRetry != proposalNoRetry {
response.ProposalRetry = proposalRetry
if forcedErr == nil {
log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal)
}
}

if forcedErr != nil {
// A forced error was set (i.e. we did not apply the proposal,
// for instance due to its log position).
response.Err = forcedErr
} else if proposal.Local.Err != nil {
// Everything went as expected, but this proposal should return
// an error to the client.
response.Err = proposal.Local.Err
} else if proposal.Local.Reply != nil {
response.Reply = proposal.Local.Reply
} else {
log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", proposal)
}
response.Intents = proposal.Local.detachIntents(response.Err != nil)
if !r.store.cfg.TestingKnobs.DisableRaftRespBeforeApplication && false {
// If not disabled by tests, we can return the response to the
// client here, before actually performing the Raft command
// application. This is valid because at this point the command has
// been replicated in the Raft log and therefore must apply to all
// replicated state machines. In addition, all error handling that
// is deterministic across replicas has already been performed in
// checkForcedErrLocked. Any error handling that will differ between
// replicas should not be reflected in the response (ie.
// ReplicaCorruptionError).
//
// For more discussion, see #17500.
proposal.maybeRespondToClient(response)
}

lResult = proposal.Local
}

if forcedErr != nil {
// Apply an empty entry.
raftCmd.ReplicatedEvalResult = storagebase.ReplicatedEvalResult{}
raftCmd.WriteBatch = nil
}
raftCmd.ReplicatedEvalResult.State.RaftAppliedIndex = index
raftCmd.ReplicatedEvalResult.State.LeaseAppliedIndex = leaseIndex

// AddSSTable ingestions run before the actual batch. This makes sure
// that when the Raft command is applied, the ingestion has definitely
// succeeded. Note that we have taken precautions during command
Expand All @@ -4396,54 +4437,51 @@ func (r *Replica) processRaftCommand(
raftCmd.ReplicatedEvalResult.AddSSTable = nil
}

raftCmd.ReplicatedEvalResult.Delta, pErr = r.applyRaftCommand(
var applyErr *roachpb.Error
var writeBatch *storagebase.WriteBatch
if raftCmd.WriteBatch != nil {
writeBatch = raftCmd.WriteBatch
}

raftCmd.ReplicatedEvalResult.Delta, applyErr = r.applyRaftCommand(
ctx, idKey, raftCmd.ReplicatedEvalResult, writeBatch)

if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; pErr == nil && filter != nil {
pErr = filter(storagebase.ApplyFilterArgs{
if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; applyErr == nil && filter != nil {
applyErr = filter(storagebase.ApplyFilterArgs{
CmdID: idKey,
ReplicatedEvalResult: raftCmd.ReplicatedEvalResult,
StoreID: r.store.StoreID(),
RangeID: r.RangeID,
})
}

pErr = r.maybeSetCorrupt(ctx, pErr)
if pErr == nil {
pErr = forcedErr
}

var lResult *LocalEvalResult
if proposedLocally {
if proposalRetry != proposalNoRetry {
response.ProposalRetry = proposalRetry
if pErr == nil {
log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal)
}
}
if pErr != nil {
// A forced error was set (i.e. we did not apply the proposal,
// for instance due to its log position) or the Replica is now
// corrupted.
response.Err = pErr
} else if proposal.Local.Err != nil {
// Everything went as expected, but this proposal should return
// an error to the client.
response.Err = proposal.Local.Err
} else if proposal.Local.Reply != nil {
response.Reply = proposal.Local.Reply
} else {
log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", proposal)
if applyErr = r.maybeSetCorrupt(ctx, applyErr); applyErr != nil {
switch applyErr.GetDetail().(type) {
case *roachpb.ReplicaCorruptionError:
// We already logged the error in maybeSetCorrupt. It is ok that we
// don't send these errors to the client immediately as they will be
// caught later now that the replica is marked as corrupt.
//
// On a more fundamental level, returning an error to the client
// here would actually be incorrect, because the entry has already
// been replicated through Raft. This means that a local
// ReplicaCorruptionError doesn't mean that all replicas are
// corrupted or that the command failed to commit. Once a command
// has been committed, the state machine application, even with
// respect to error handling, needs to be deterministic across all
// replicas.
default:
// If the error is not a ReplicaCorruptionError then we should
// have returned it to the client.
log.Fatalf(ctx, "found unexpected error during Raft application: %v", applyErr)
}
response.Intents = proposal.Local.detachIntents(response.Err != nil)
lResult = proposal.Local
}

// Handle the EvalResult, executing any side effects of the last
// state machine transition.
//
// Note that this must happen after committing (the engine.Batch), but
// before notifying a potentially waiting client.
// before finishing the Raft application and calling endCmds.done.
r.handleEvalResultRaftMuLocked(ctx, lResult, raftCmd.ReplicatedEvalResult)
}

Expand Down
47 changes: 31 additions & 16 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type ProposalData struct {
// proposalResult come from LocalEvalResult).
//
// Attention: this channel is not to be signaled directly downstream of Raft.
// Always use ProposalData.finishRaftApplication().
// Always use ProposalData.maybeRespondToClient or finishRaftApplication.
doneCh chan proposalResult

// Local contains the results of evaluating the request
Expand All @@ -99,28 +99,43 @@ type ProposalData struct {
}

// finishRaftApplication is called downstream of Raft when a command application
// has finished. proposal.doneCh is signaled with pr so that the proposer is
// unblocked.
// has finished. Replica state is updated so that future proposals see the
// effect of this proposal, notably on the timestamp cache and command queue.
// After this, proposal.doneCh is signaled with pr so that the proposer is
// unblocked and free to respond to the client if it hasn't been already.
//
// It first invokes the endCmds function and then sends the specified
// proposalResult on the proposal's done channel. endCmds is invoked here in
// order to allow the original client to be cancelled and possibly no longer
// listening to this done channel, and so can't be counted on to invoke endCmds
// itself.
// Because the update to the command queue will allow dependent commands to
// begin proposing, proposer evaluated kv requires that all desired effects of
// this proposal be reflected in the underlying state machine (i.e. the engine)
// before this method is called. See Replica.applyRaftCommand.
//
// Note: this should not be called upstream of Raft because, in case pr.Err is
// set, it clears the intents from pr before sending it on the channel. This
// clearing should not be done upstream of Raft because, in cases of errors
// encountered upstream of Raft, we might still want to resolve intents:
// upstream of Raft, pr.intents represent intents encountered by a request, not
// the current txn's intents.
// Note: this should not be called upstream of Raft.
func (proposal *ProposalData) finishRaftApplication(pr proposalResult) {
if proposal.endCmds != nil {
proposal.endCmds.done(pr.Reply, pr.Err, pr.ProposalRetry)
proposal.endCmds = nil
}
proposal.doneCh <- pr
close(proposal.doneCh)
proposal.maybeRespondToClient(pr)
}

// maybeRespondToClient is called downstream of Raft when a command result can
// be delivered to the client RPC. proposal.doneCh is signaled with pr so that
// the proposer is unblocked.
//
// It is possible that this is called ahead of finishRaftApplication, in which
// case the future call to that method will not also respond to the client. This
// is because there are cases where it is valid to respond to the client with a
// proposal result before actually applying the proposal to the Raft state
// machine. However, it is not valid to call endCmds.done until after the state
// machine update.
//
// Note: this should not be called upstream of Raft.
func (proposal *ProposalData) maybeRespondToClient(pr proposalResult) {
if proposal.doneCh != nil {
proposal.doneCh <- pr
close(proposal.doneCh)
proposal.doneCh = nil
}
}

// LocalEvalResult is data belonging to an evaluated command that is
Expand Down
Loading

0 comments on commit 79ddf9d

Please sign in to comment.