Skip to content

Commit

Permalink
Merge pull request cockroachdb#18681 from tschottdorf/handleraft-error
Browse files Browse the repository at this point in the history
storage: expose site of error return
  • Loading branch information
tbg authored Sep 23, 2017
2 parents 404b1c7 + f3935a9 commit 241b883
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 31 deletions.
71 changes: 47 additions & 24 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3165,17 +3165,23 @@ type handleRaftReadyStats struct {
// are ready to read, be saved to stable storage, committed or sent to other
// peers. It takes a non-empty IncomingSnapshot to indicate that it is
// about to process a snapshot.
func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) (handleRaftReadyStats, error) {
//
// The returned string is nonzero whenever an error is returned to give a
// non-sensitive cue as to what happened.
func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) (handleRaftReadyStats, string, error) {
r.raftMu.Lock()
defer r.raftMu.Unlock()
return r.handleRaftReadyRaftMuLocked(inSnap)
}

// handleRaftReadyLocked is the same as handleRaftReady but requires that the
// replica's raftMu be held.
//
// The returned string is nonzero whenever an error is returned to give a
// non-sensitive cue as to what happened.
func (r *Replica) handleRaftReadyRaftMuLocked(
inSnap IncomingSnapshot,
) (handleRaftReadyStats, error) {
) (handleRaftReadyStats, string, error) {
var stats handleRaftReadyStats

ctx := r.AnnotateCtx(context.TODO())
Expand Down Expand Up @@ -3218,11 +3224,12 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
})
r.mu.Unlock()
if err != nil {
return stats, err
const expl = "while checking raft group for Ready"
return stats, expl, errors.Wrap(err, expl)
}

if !hasReady {
return stats, nil
return stats, "", nil
}

logRaftReady(ctx, rd)
Expand All @@ -3248,7 +3255,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
if !raft.IsEmptySnap(rd.Snapshot) {
snapUUID, err := uuid.FromBytes(rd.Snapshot.Data)
if err != nil {
return stats, errors.Wrap(err, "invalid snapshot id")
const expl = "invalid snapshot id"
return stats, expl, errors.Wrap(err, expl)
}
if inSnap.SnapUUID == (uuid.UUID{}) {
log.Fatalf(ctx, "programming error: a snapshot application was attempted outside of the streaming snapshot codepath")
Expand All @@ -3258,7 +3266,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}

if err := r.applySnapshot(ctx, inSnap, rd.Snapshot, rd.HardState); err != nil {
return stats, err
const expl = "while applying snapshot"
return stats, expl, errors.Wrap(err, expl)
}

if err := func() error {
Expand All @@ -3268,12 +3277,10 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
if r.store.removePlaceholderLocked(ctx, r.RangeID) {
atomic.AddInt32(&r.store.counts.filledPlaceholders, 1)
}
if err := r.store.processRangeDescriptorUpdateLocked(ctx, r); err != nil {
return errors.Wrap(err, "could not processRangeDescriptorUpdate after applySnapshot")
}
return nil
return r.store.processRangeDescriptorUpdateLocked(ctx, r)
}(); err != nil {
return stats, err
const expl = "could not processRangeDescriptorUpdate after applySnapshot"
return stats, expl, errors.Wrap(err, expl)
}

// r.mu.lastIndex, r.mu.lastTerm and r.mu.raftLogSize were updated in
Expand Down Expand Up @@ -3311,18 +3318,21 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// last index.
thinEntries, sideLoadedEntriesSize, err := r.maybeSideloadEntriesRaftMuLocked(ctx, rd.Entries)
if err != nil {
return stats, err
const expl = "during sideloading"
return stats, expl, errors.Wrap(err, expl)
}
raftLogSize += sideLoadedEntriesSize
if lastIndex, lastTerm, raftLogSize, err = r.append(
ctx, writer, lastIndex, lastTerm, raftLogSize, thinEntries,
); err != nil {
return stats, err
const expl = "during append"
return stats, expl, errors.Wrap(err, expl)
}
}
if !raft.IsEmptyHardState(rd.HardState) {
if err := r.raftMu.stateLoader.setHardState(ctx, writer, rd.HardState); err != nil {
return stats, err
const expl = "during setHardState"
return stats, expl, errors.Wrap(err, expl)
}
}
writer.Close()
Expand All @@ -3341,7 +3351,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// infer the that entries are persisted on the node that sends a snapshot.
start := timeutil.Now()
if err := batch.Commit(syncRaftLog.Get(&r.store.cfg.Settings.SV) && rd.MustSync); err != nil {
return stats, err
const expl = "while committing batch"
return stats, expl, errors.Wrap(err, expl)
}
elapsed := timeutil.Since(start)
r.store.metrics.RaftLogCommitLatency.RecordValue(elapsed.Nanoseconds())
Expand All @@ -3358,7 +3369,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
for i := firstPurge; i <= lastPurge; i++ {
err := r.raftMu.sideloaded.Purge(ctx, i, purgeTerm)
if err != nil && errors.Cause(err) != errSideloadedFileNotFound {
return stats, errors.Wrapf(err, "while purging index %d", i)
const expl = "while purging index %d"
return stats, expl, errors.Wrapf(err, expl, i)
}
}
}
Expand Down Expand Up @@ -3429,7 +3441,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
if newEnt, err := maybeInlineSideloadedRaftCommand(
ctx, r.RangeID, e, r.raftMu.sideloaded, r.store.raftEntryCache,
); err != nil {
return stats, err
const expl = "maybeInlineSideloadedRaftCommand"
return stats, expl, errors.Wrap(err, expl)
} else if newEnt != nil {
e = *newEnt
}
Expand Down Expand Up @@ -3460,7 +3473,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
if len(encodedCommand) == 0 {
commandID = ""
} else if err := proto.Unmarshal(encodedCommand, &command); err != nil {
return stats, err
const expl = "while unmarshalling entry"
return stats, expl, errors.Wrap(err, expl)
}
}

Expand All @@ -3487,15 +3501,19 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := proto.Unmarshal(e.Data, &cc); err != nil {
return stats, err
const expl = "while unmarshaling ConfChange"
return stats, expl, errors.Wrap(err, expl)
}
var ccCtx ConfChangeContext
if err := proto.Unmarshal(cc.Context, &ccCtx); err != nil {
return stats, err
const expl = "while unmarshaling ConfChangeContext"
return stats, expl, errors.Wrap(err, expl)

}
var command storagebase.RaftCommand
if err := proto.Unmarshal(ccCtx.Payload, &command); err != nil {
return stats, err
const expl = "while unmarshaling RaftCommand"
return stats, expl, errors.Wrap(err, expl)
}
commandID := storagebase.CmdIDKey(ccCtx.CommandID)
if changedRepl := r.processRaftCommand(
Expand All @@ -3519,7 +3537,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
raftGroup.ApplyConfChange(cc)
return true, nil
}); err != nil {
return stats, err
const expl = "during ApplyConfChange"
return stats, expl, errors.Wrap(err, expl)
}
default:
log.Fatalf(ctx, "unexpected Raft entry: %v", e)
Expand All @@ -3534,10 +3553,14 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// TODO(bdarnell): need to check replica id and not Advance if it
// has changed. Or do we need more locking to guarantee that replica
// ID cannot change during handleRaftReady?
return stats, r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
const expl = "during advance"
if err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.Advance(rd)
return true, nil
})
}); err != nil {
return stats, expl, errors.Wrap(err, expl)
}
return stats, "", nil
}

// tick the Raft group, returning any error and true if the raft group exists
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6650,11 +6650,11 @@ func TestQuotaPoolAccessOnDestroyedReplica(t *testing.T) {
t.Fatal(err)
}

if _, err := repl.handleRaftReady(IncomingSnapshot{}); err != nil {
if _, _, err := repl.handleRaftReady(IncomingSnapshot{}); err != nil {
t.Fatal(err)
}

if _, err := repl.handleRaftReady(IncomingSnapshot{}); err != nil {
if _, _, err := repl.handleRaftReady(IncomingSnapshot{}); err != nil {
t.Fatal(err)
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3190,9 +3190,9 @@ func (s *Store) processRaftRequest(
return roachpb.NewError(err)
}

if _, err := r.handleRaftReadyRaftMuLocked(inSnap); err != nil {
// mimic the behavior in processRaft.
log.Fatal(ctx, err)
if _, expl, err := r.handleRaftReadyRaftMuLocked(inSnap); err != nil {
// Mimic the behavior in processRaft.
log.Fatalf(ctx, "%s: %s", log.Safe(expl), err) // TODO(bdarnell)
}
removePlaceholder = false
return nil
Expand Down Expand Up @@ -3575,9 +3575,9 @@ func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) {

start := timeutil.Now()
r := (*Replica)(value)
stats, err := r.handleRaftReady(IncomingSnapshot{})
stats, expl, err := r.handleRaftReady(IncomingSnapshot{})
if err != nil {
log.Fatal(ctx, err) // TODO(bdarnell)
log.Fatalf(ctx, "%s: %s", log.Safe(expl), err) // TODO(bdarnell)
}
elapsed := timeutil.Since(start)
s.metrics.RaftWorkingDurationNanos.Inc(elapsed.Nanoseconds())
Expand Down

0 comments on commit 241b883

Please sign in to comment.