Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: remove unnecessary sideload inlining, add assertion #20573

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 9 additions & 33 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3504,10 +3504,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// Update protected state (last index, last term, raft log size and raft
// leader ID) and set raft log entry cache. We clear any older, uncommitted
// log entries and cache the latest ones.
//
// Note also that we're likely to send messages related to the Entries we
// just appended, and these entries need to be inlined when sending them to
// followers - populating the cache here saves a lot of that work.
r.mu.Lock()
r.store.raftEntryCache.addEntries(r.RangeID, rd.Entries)
r.mu.lastIndex = lastIndex
Expand Down Expand Up @@ -4180,35 +4176,15 @@ func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Messag
drop := false
switch message.Type {
case raftpb.MsgApp:
// Iterate over the entries to inline sideloaded commands.
for j := range message.Entries {
cow := false
newEnt, err := maybeInlineSideloadedRaftCommand(
ctx,
r.RangeID,
message.Entries[j],
r.raftMu.sideloaded,
r.store.raftEntryCache,
)
if err != nil {
// We can simply drop the message since it could always get lost
// in transit anyway.
log.Errorf(ctx, "while inlining sideloaded commands: %s", err)
drop = true
continue
}
if newEnt != nil {
if !cow {
cow = true
// Copy the whole slice to avoid data races. Entries are
// usually shared between multiple outgoing messages,
// and while it would be possible to only modify them
// only the first time around, that isn't easy to
// implement (since you have to do nontrivial work to
// decide whether inlining needs to happen).
message.Entries = append([]raftpb.Entry(nil), message.Entries...)
}
message.Entries[j] = *newEnt
if util.RaceEnabled {
// Iterate over the entries to assert that all sideloaded commands
// are already inlined. replicaRaftStorage.Entries already performs
// the sideload inlining for stable entries and raft.unstable always
// contain fat entries. Since these are the only two sources that
// raft.sendAppend gathers entries from to populate MsgApps, we
// should never see thin entries here.
for j := range message.Entries {
assertSideloadedRaftCommandInlined(ctx, &message.Entries[j])
}
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,24 @@ func maybeInlineSideloadedRaftCommand(
}
return &ent, nil
}

// assertSideloadedRaftCommandInlined asserts that if the provided entry is a
// sideloaded entry, then its payload has already been inlined. Doing so
// requires unmarshalling the raft command, so this assertion should be kept out
// of performance critical paths.
func assertSideloadedRaftCommandInlined(ctx context.Context, ent *raftpb.Entry) {
if !sniffSideloadedRaftCommand(ent.Data) {
return
}

var command storagebase.RaftCommand
_, data := DecodeRaftCommand(ent.Data)
if err := protoutil.Unmarshal(data, &command); err != nil {
log.Fatal(ctx, err)
}

if len(command.ReplicatedEvalResult.AddSSTable.Data) == 0 {
// The entry is "thin", which is what this assertion is checking for.
log.Fatalf(ctx, "found thin sideloaded raft command: %+v", command)
}
}