Skip to content

Commit

Permalink
applyRaftCommand
Browse files Browse the repository at this point in the history
  • Loading branch information
tamird committed Jul 16, 2015
1 parent 8d17066 commit 140d855
Showing 1 changed file with 10 additions and 21 deletions.
31 changes: 10 additions & 21 deletions storage/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,8 @@ func (r *Range) AddCmd(ctx context.Context, call proto.Call) error {

if err != nil {
replyHeader := call.Reply.Header()
// Ideally we wouldn't allow any error to be set here, but in the
// `addWriteCmd` case these replies go through the raft machinery
// which hasn't been audited yet.
if replyHeader.Error != nil && replyHeader.Error.Error() != replyHeader.GoError().Error() {
panic(fmt.Sprintf("cannot set error to %s, already set to %s", err, replyHeader.GoError()))
if replyHeader.Error != nil {
panic("the world is on fire")
}
replyHeader.SetGoError(err)
}
Expand Down Expand Up @@ -819,15 +816,12 @@ func (r *Range) processRaftCommand(idKey cmdIDKey, index uint64, raftCmd proto.I
ctx = r.context()
}

reply = args.CreateReply()

execDone := tracer.FromCtx(ctx).Epoch(fmt.Sprintf("applying %s", args.Method()))
// applyRaftCommand will return "expected" errors, but may also indicate
// replica corruption (as of now, signaled by a replicaCorruptionError).
// We feed its return through maybeSetCorrupt to act when that happens.
err := r.maybeSetCorrupt(
r.applyRaftCommand(ctx, index, proto.RaftNodeID(raftCmd.OriginNodeID), args, reply),
)
reply, err := r.applyRaftCommand(ctx, index, proto.RaftNodeID(raftCmd.OriginNodeID), args)
err = r.maybeSetCorrupt(err)
execDone()

if cmd != nil {
Expand All @@ -843,28 +837,21 @@ func (r *Range) processRaftCommand(idKey cmdIDKey, index uint64, raftCmd proto.I
// underlying state machine (i.e. the engine).
// When certain critical operations fail, a replicaCorruptionError may be
// returned and must be handled by the caller.
func (r *Range) applyRaftCommand(ctx context.Context, index uint64, originNode proto.RaftNodeID, args proto.Request, reply proto.Response) error {
func (r *Range) applyRaftCommand(ctx context.Context, index uint64, originNode proto.RaftNodeID, args proto.Request) (proto.Response, error) {
if index <= 0 {
log.Fatalc(ctx, "raft command index is <= 0")
}

// If we have an out of order index, there's corruption. No sense in trying
// to update anything or run the command. Simply return a corruption error.
if oldIndex := atomic.LoadUint64(&r.appliedIndex); oldIndex >= index {
return newReplicaCorruptionError(util.Errorf("applied index moved backwards: %d >= %d", oldIndex, index))
return nil, newReplicaCorruptionError(util.Errorf("applied index moved backwards: %d >= %d", oldIndex, index))
}

// Call the helper, which returns a batch containing data written
// during command execution and any associated error.
ms := engine.MVCCStats{}
batch, batchReply, rErr := r.applyRaftCommandInBatch(ctx, index, originNode, args, &ms)
if batchReply != nil {
gogoproto.Merge(reply, batchReply)
}
// ALWAYS set the reply header error to the error returned by the
// helper. This is the definitive result of the execution. The
// error must be set before saving to the response cache.
reply.Header().SetGoError(rErr)
batch, reply, rErr := r.applyRaftCommandInBatch(ctx, index, originNode, args, &ms)
defer batch.Close()

// Advance the last applied index and commit the batch.
Expand Down Expand Up @@ -896,7 +883,7 @@ func (r *Range) applyRaftCommand(ctx context.Context, index uint64, originNode p
}
}

return rErr
return reply, rErr
}

// applyRaftCommandInBatch executes the command in a batch engine and
Expand Down Expand Up @@ -937,6 +924,8 @@ func (r *Range) applyRaftCommandInBatch(ctx context.Context, index uint64, origi
if log.V(1) {
log.Infoc(ctx, "found response cache entry for %+v", args.Header().CmdID)
}
// TODO(tamird): move this into the response cache itself
defer func() { reply.Header().Error = nil }()
// We successfully read from the response cache, so return whatever error
// was present in the cached entry (if any).
return batch, reply, reply.Header().GoError()
Expand Down

0 comments on commit 140d855

Please sign in to comment.