Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] storage: respond to Raft proposals after entries commit, not execute #18710

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *client.DB) {
if err := testutils.MatchInOrder(tracing.FormatRecordedSpans(collect()),
"evaluating AddSSTable",
"sideloadable proposal detected",
"ingested SSTable at index",
// "ingested SSTable at index",
); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *client.DB) {
"evaluating AddSSTable",
"target key range not empty, will merge existing data with sstable",
"sideloadable proposal detected",
"ingested SSTable at index",
// "ingested SSTable at index",
); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/show_trace
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ SELECT span, operation, message FROM [SHOW KV TRACE FOR DELETE FROM t.kv2]
(0,1) starting plan DelRange /Table/53/1 - /Table/53/2
(0,1) starting plan querying next range at /Table/53/1
(0,1) starting plan r1: sending batch 1 DelRng, 1 BeginTxn, 1 EndTxn to (n1,s1):1
(0,4) consuming rows fast path - rows affected: 2
(0,5) consuming rows fast path - rows affected: 2

query TTT
SELECT span, operation, regexp_replace(message, 'wall_time:\d+', 'wall_time:...') as message
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func createTestStoreWithEngine(
nodeDesc.NodeID, rpcContext, server, stopper, metric.NewRegistry(),
)
storeCfg.ScanMaxIdleTime = 1 * time.Second
storeCfg.TestingKnobs.DisableRaftRespBeforeApplication = true
stores := storage.NewStores(ac, storeCfg.Clock, storeCfg.Settings.Version.MinSupportedVersion, storeCfg.Settings.Version.ServerVersion)

if err := storeCfg.Gossip.SetNodeDescriptor(nodeDesc); err != nil {
Expand Down Expand Up @@ -620,6 +621,7 @@ func (m *multiTestContext) makeStoreConfig(i int) storage.StoreConfig {
cfg.Gossip = m.gossips[i]
cfg.TestingKnobs.DisableSplitQueue = true
cfg.TestingKnobs.ReplicateQueueAcceptsUnsplit = true
cfg.TestingKnobs.DisableRaftRespBeforeApplication = true
return cfg
}

Expand Down
128 changes: 83 additions & 45 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -4309,14 +4309,20 @@ func (r *Replica) processRaftCommand(
r.mu.Lock()
proposal, proposedLocally := r.mu.proposals[idKey]

// TODO(tschottdorf): consider the Trace situation here.
if proposedLocally {
// We initiated this command, so use the caller-supplied context.
ctx = proposal.ctx
proposal.ctx = nil // avoid confusion
delete(r.mu.proposals, idKey)
}

// Since we're responding to the Raft command before it's application, we
// need to make sure that out tracing Span is not finished before we're done
// with our work here.
var sp opentracing.Span
ctx, sp = tracing.ForkCtxSpan(ctx, "raft application")
defer tracing.FinishSpan(sp)

leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally)

r.mu.Unlock()
Expand All @@ -4343,7 +4349,6 @@ func (r *Replica) processRaftCommand(
}

var response proposalResult
var writeBatch *storagebase.WriteBatch
{
if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; forcedErr == nil && filter != nil {
forcedErr = filter(storagebase.ApplyFilterArgs{
Expand All @@ -4354,25 +4359,61 @@ func (r *Replica) processRaftCommand(
})
}

if forcedErr != nil {
// Apply an empty entry.
raftCmd.ReplicatedEvalResult = storagebase.ReplicatedEvalResult{}
raftCmd.WriteBatch = nil
}
raftCmd.ReplicatedEvalResult.State.RaftAppliedIndex = index
raftCmd.ReplicatedEvalResult.State.LeaseAppliedIndex = leaseIndex

// Update the node clock with the serviced request. This maintains
// a high water mark for all ops serviced, so that received ops without
// a timestamp specified are guaranteed one higher than any op already
// executed for overlapping keys.
r.store.Clock().Update(ts)

var pErr *roachpb.Error
if raftCmd.WriteBatch != nil {
writeBatch = raftCmd.WriteBatch
var lResult *LocalEvalResult
if proposedLocally {
if proposalRetry != proposalNoRetry {
response.ProposalRetry = proposalRetry
if forcedErr == nil {
log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal)
}
}

if forcedErr != nil {
// A forced error was set (i.e. we did not apply the proposal,
// for instance due to its log position).
response.Err = forcedErr
} else if proposal.Local.Err != nil {
// Everything went as expected, but this proposal should return
// an error to the client.
response.Err = proposal.Local.Err
} else if proposal.Local.Reply != nil {
response.Reply = proposal.Local.Reply
} else {
log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", proposal)
}
response.Intents = proposal.Local.detachIntents(response.Err != nil)
if !r.store.cfg.TestingKnobs.DisableRaftRespBeforeApplication && false {
// If not disabled by tests, we can return the response to the
// client here, before actually performing the Raft command
// application. This is valid because at this point the command has
// been replicated in the Raft log and therefore must apply to all
// replicated state machines. In addition, all error handling that
// is deterministic across replicas has already been performed in
// checkForcedErrLocked. Any error handling that will differ between
// replicas should not be reflected in the response (ie.
// ReplicaCorruptionError).
//
// For more discussion, see #17500.
proposal.maybeRespondToClient(response)
}

lResult = proposal.Local
}

if forcedErr != nil {
// Apply an empty entry.
raftCmd.ReplicatedEvalResult = storagebase.ReplicatedEvalResult{}
raftCmd.WriteBatch = nil
}
raftCmd.ReplicatedEvalResult.State.RaftAppliedIndex = index
raftCmd.ReplicatedEvalResult.State.LeaseAppliedIndex = leaseIndex

// AddSSTable ingestions run before the actual batch. This makes sure
// that when the Raft command is applied, the ingestion has definitely
// succeeded. Note that we have taken precautions during command
Expand All @@ -4396,54 +4437,51 @@ func (r *Replica) processRaftCommand(
raftCmd.ReplicatedEvalResult.AddSSTable = nil
}

raftCmd.ReplicatedEvalResult.Delta, pErr = r.applyRaftCommand(
var applyErr *roachpb.Error
var writeBatch *storagebase.WriteBatch
if raftCmd.WriteBatch != nil {
writeBatch = raftCmd.WriteBatch
}

raftCmd.ReplicatedEvalResult.Delta, applyErr = r.applyRaftCommand(
ctx, idKey, raftCmd.ReplicatedEvalResult, writeBatch)

if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; pErr == nil && filter != nil {
pErr = filter(storagebase.ApplyFilterArgs{
if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; applyErr == nil && filter != nil {
applyErr = filter(storagebase.ApplyFilterArgs{
CmdID: idKey,
ReplicatedEvalResult: raftCmd.ReplicatedEvalResult,
StoreID: r.store.StoreID(),
RangeID: r.RangeID,
})
}

pErr = r.maybeSetCorrupt(ctx, pErr)
if pErr == nil {
pErr = forcedErr
}

var lResult *LocalEvalResult
if proposedLocally {
if proposalRetry != proposalNoRetry {
response.ProposalRetry = proposalRetry
if pErr == nil {
log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal)
}
}
if pErr != nil {
// A forced error was set (i.e. we did not apply the proposal,
// for instance due to its log position) or the Replica is now
// corrupted.
response.Err = pErr
} else if proposal.Local.Err != nil {
// Everything went as expected, but this proposal should return
// an error to the client.
response.Err = proposal.Local.Err
} else if proposal.Local.Reply != nil {
response.Reply = proposal.Local.Reply
} else {
log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", proposal)
if applyErr = r.maybeSetCorrupt(ctx, applyErr); applyErr != nil {
switch applyErr.GetDetail().(type) {
case *roachpb.ReplicaCorruptionError:
// We already logged the error in maybeSetCorrupt. It is ok that we
// don't send these errors to the client immediately as they will be
// caught later now that the replica is marked as corrupt.
//
// On a more fundamental level, returning an error to the client
// here would actually be incorrect, because the entry has already
// been replicated through Raft. This means that a local
// ReplicaCorruptionError doesn't mean that all replicas are
// corrupted or that the command failed to commit. Once a command
// has been committed, the state machine application, even with
// respect to error handling, needs to be deterministic across all
// replicas.
default:
// If the error is not a ReplicaCorruptionError then we should
// have returned it to the client.
log.Fatalf(ctx, "found unexpected error during Raft application: %v", applyErr)
}
response.Intents = proposal.Local.detachIntents(response.Err != nil)
lResult = proposal.Local
}

// Handle the EvalResult, executing any side effects of the last
// state machine transition.
//
// Note that this must happen after committing (the engine.Batch), but
// before notifying a potentially waiting client.
// before finishing the Raft application and calling endCmds.done.
r.handleEvalResultRaftMuLocked(ctx, lResult, raftCmd.ReplicatedEvalResult)
}

Expand Down
47 changes: 31 additions & 16 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type ProposalData struct {
// proposalResult come from LocalEvalResult).
//
// Attention: this channel is not to be signaled directly downstream of Raft.
// Always use ProposalData.finishRaftApplication().
// Always use ProposalData.maybeRespondToClient or finishRaftApplication.
doneCh chan proposalResult

// Local contains the results of evaluating the request
Expand All @@ -99,28 +99,43 @@ type ProposalData struct {
}

// finishRaftApplication is called downstream of Raft when a command application
// has finished. proposal.doneCh is signaled with pr so that the proposer is
// unblocked.
// has finished. Replica state is updated so that future proposals see the
// effect of this proposal, notably on the timestamp cache and command queue.
// After this, proposal.doneCh is signaled with pr so that the proposer is
// unblocked and free to respond to the client if it hasn't been already.
//
// It first invokes the endCmds function and then sends the specified
// proposalResult on the proposal's done channel. endCmds is invoked here in
// order to allow the original client to be cancelled and possibly no longer
// listening to this done channel, and so can't be counted on to invoke endCmds
// itself.
// Because the update to the command queue will allow dependent commands to
// begin proposing, proposer evaluated kv requires that all desired effects of
// this proposal be reflected in the underlying state machine (i.e. the engine)
// before this method is called. See Replica.applyRaftCommand.
//
// Note: this should not be called upstream of Raft because, in case pr.Err is
// set, it clears the intents from pr before sending it on the channel. This
// clearing should not be done upstream of Raft because, in cases of errors
// encountered upstream of Raft, we might still want to resolve intents:
// upstream of Raft, pr.intents represent intents encountered by a request, not
// the current txn's intents.
// Note: this should not be called upstream of Raft.
func (proposal *ProposalData) finishRaftApplication(pr proposalResult) {
if proposal.endCmds != nil {
proposal.endCmds.done(pr.Reply, pr.Err, pr.ProposalRetry)
proposal.endCmds = nil
}
proposal.doneCh <- pr
close(proposal.doneCh)
proposal.maybeRespondToClient(pr)
}

// maybeRespondToClient is called downstream of Raft when a command result can
// be delivered to the client RPC. proposal.doneCh is signaled with pr so that
// the proposer is unblocked.
//
// It is possible that this is called ahead of finishRaftApplication, in which
// case the future call to that method will not also respond to the client. This
// is because there are cases where it is valid to respond to the client with a
// proposal result before actually applying the proposal to the Raft state
// machine. However, it is not valid to call endCmds.done until after the state
// machine update.
//
// Note: this should not be called upstream of Raft.
func (proposal *ProposalData) maybeRespondToClient(pr proposalResult) {
if proposal.doneCh != nil {
proposal.doneCh <- pr
close(proposal.doneCh)
proposal.doneCh = nil
}
}

// LocalEvalResult is data belonging to an evaluated command that is
Expand Down
Loading