Skip to content

Commit

Permalink
storage: sync entries to disk in parallel with followers
Browse files Browse the repository at this point in the history
Referenced in #17500.

This change implements the optimization in the Raft thesis under the
section: 10.2.1 Writing to the leader’s disk in parallel. The optimization
allows the leader to sync new entries to its disk after it has sent the
corresponding `MsgApp` messages, instead of before.

Here, we invoke this optimization by:
1. sending all MsgApps.
2. syncing all entries and Raft state to disk.
3. sending all other messages.

Release note (performance improvement): Raft followers now write to
their disks in parallel with the leader.
  • Loading branch information
nvanbenschoten committed Dec 12, 2017
1 parent 5fb2c78 commit b67eb69
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 5 deletions.
57 changes: 52 additions & 5 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3430,6 +3430,43 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}
}

// Separate the MsgApp messages from all other Raft message types so that we
// can take advantage of the optimization discussed in the Raft thesis under
// the section: `10.2.1 Writing to the leader’s disk in parallel`. The
// optimization suggests that instead of a leader writing new log entries to
// disk before replicating them to its followers, the leader can instead
// write the entries to disk in parallel with replicating to its followers
// and them writing to their disks.
//
// Here, we invoke this optimization by:
// 1. sending all MsgApps.
// 2. syncing all entries and Raft state to disk.
// 3. sending all other messages.
//
// Since this is all handled in handleRaftReadyRaftMuLocked, we're assured
// that even though we may sync new entries to disk after sending them in
// MsgApps to followers, we'll always have them synced to disk before we
// process followers' MsgAppResps for the corresponding entries because this
// entire method requires RaftMu to be locked. This is a requirement because
// etcd/raft does not support commit quorums that do not include the leader,
// even though the Raft thesis states that this would technically be safe:
// > The leader may even commit an entry before it has been written to its
// > own disk, if a majority of followers have written it to their disks;
// > this is still safe.
//
// However, MsgApps are also used to inform followers of committed entries
// through the Commit index that they contains. Because the optimization
// sends all MsgApps before syncing to disc, we may send out a commit index
// in a MsgApp that we have not ourselves written in HardState.Commit. This
// is ok, because the Commit index can be treated as volatile state, as is
// supported by raft.MustSync. The Raft thesis corroborates this, stating in
// section: `3.8 Persisted state and server restarts` that:
// > Other state variables are safe to lose on a restart, as they can all be
// > recreated. The most interesting example is the commit index, which can
// > safely be reinitialized to zero on a restart.
mgsApps, otherMsgs := splitMsgApps(rd.Messages)
r.sendRaftMessages(ctx, mgsApps)

// Use a more efficient write-only batch because we don't need to do any
// reads from the batch. Any reads are performed via the "distinct" batch
// which passes the reads through to the underlying DB.
Expand Down Expand Up @@ -3504,10 +3541,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// Update protected state (last index, last term, raft log size and raft
// leader ID) and set raft log entry cache. We clear any older, uncommitted
// log entries and cache the latest ones.
//
// Note also that we're likely to send messages related to the Entries we
// just appended, and these entries need to be inlined when sending them to
// followers - populating the cache here saves a lot of that work.
r.mu.Lock()
r.store.raftEntryCache.addEntries(r.RangeID, rd.Entries)
r.mu.lastIndex = lastIndex
Expand All @@ -3519,7 +3552,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}
r.mu.Unlock()

r.sendRaftMessages(ctx, rd.Messages)
r.sendRaftMessages(ctx, otherMsgs)

for _, e := range rd.CommittedEntries {
switch e.Type {
Expand Down Expand Up @@ -3651,6 +3684,20 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
return stats, "", nil
}

// splitMsgApps splits the Raft message slice into two slices, one containing
// MsgApps and one containing all other message types. Each slice retains the
// relative ordering between messages in the original slice.
func splitMsgApps(msgs []raftpb.Message) (mgsApps, otherMsgs []raftpb.Message) {
splitIdx := 0
for i, msg := range msgs {
if msg.Type == raftpb.MsgApp {
msgs[i], msgs[splitIdx] = msgs[splitIdx], msgs[i]
splitIdx++
}
}
return msgs[:splitIdx], msgs[splitIdx:]
}

func fatalOnRaftReadyErr(ctx context.Context, expl string, err error) {
// Mimic the behavior in processRaft.
log.Fatalf(ctx, "%s: %s", log.Safe(expl), err) // TODO(bdarnell)
Expand Down
92 changes: 92 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8704,6 +8704,98 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {
}
}

func TestSplitMsgApps(t *testing.T) {
defer leaktest.AfterTest(t)()

msgApp := func(idx uint64) raftpb.Message {
return raftpb.Message{Index: idx, Type: raftpb.MsgApp}
}
otherMsg := func(idx uint64) raftpb.Message {
return raftpb.Message{Index: idx, Type: raftpb.MsgVote}
}
formatMsgs := func(msgs []raftpb.Message) string {
strs := make([]string, len(msgs))
for i, msg := range msgs {
strs[i] = fmt.Sprintf("{%s:%d}", msg.Type, msg.Index)
}
return fmt.Sprint(strs)
}

testCases := []struct {
msgsIn, msgAppsOut, otherMsgsOut []raftpb.Message
}{
// No msgs.
{
msgsIn: []raftpb.Message{},
msgAppsOut: []raftpb.Message{},
otherMsgsOut: []raftpb.Message{},
},
// Only msgApps.
{
msgsIn: []raftpb.Message{msgApp(1)},
msgAppsOut: []raftpb.Message{msgApp(1)},
otherMsgsOut: []raftpb.Message{},
},
{
msgsIn: []raftpb.Message{msgApp(1), msgApp(2)},
msgAppsOut: []raftpb.Message{msgApp(1), msgApp(2)},
otherMsgsOut: []raftpb.Message{},
},
{
msgsIn: []raftpb.Message{msgApp(2), msgApp(1)},
msgAppsOut: []raftpb.Message{msgApp(2), msgApp(1)},
otherMsgsOut: []raftpb.Message{},
},
// Only otherMsgs.
{
msgsIn: []raftpb.Message{otherMsg(1)},
msgAppsOut: []raftpb.Message{},
otherMsgsOut: []raftpb.Message{otherMsg(1)},
},
{
msgsIn: []raftpb.Message{otherMsg(1), otherMsg(2)},
msgAppsOut: []raftpb.Message{},
otherMsgsOut: []raftpb.Message{otherMsg(1), otherMsg(2)},
},
{
msgsIn: []raftpb.Message{otherMsg(2), otherMsg(1)},
msgAppsOut: []raftpb.Message{},
otherMsgsOut: []raftpb.Message{otherMsg(2), otherMsg(1)},
},
// Mixed msgApps and otherMsgs.
{
msgsIn: []raftpb.Message{msgApp(1), otherMsg(2)},
msgAppsOut: []raftpb.Message{msgApp(1)},
otherMsgsOut: []raftpb.Message{otherMsg(2)},
},
{
msgsIn: []raftpb.Message{otherMsg(1), msgApp(2)},
msgAppsOut: []raftpb.Message{msgApp(2)},
otherMsgsOut: []raftpb.Message{otherMsg(1)},
},
{
msgsIn: []raftpb.Message{msgApp(1), otherMsg(2), msgApp(3)},
msgAppsOut: []raftpb.Message{msgApp(1), msgApp(3)},
otherMsgsOut: []raftpb.Message{otherMsg(2)},
},
{
msgsIn: []raftpb.Message{otherMsg(1), msgApp(2), otherMsg(3)},
msgAppsOut: []raftpb.Message{msgApp(2)},
otherMsgsOut: []raftpb.Message{otherMsg(1), otherMsg(3)},
},
}
for _, c := range testCases {
inStr := formatMsgs(c.msgsIn)
t.Run(inStr, func(t *testing.T) {
msgAppsRes, otherMsgsRes := splitMsgApps(c.msgsIn)
if !reflect.DeepEqual(msgAppsRes, c.msgAppsOut) || !reflect.DeepEqual(otherMsgsRes, c.otherMsgsOut) {
t.Errorf("expected splitMsgApps(%s)=%s/%s, found %s/%s", inStr, formatMsgs(c.msgAppsOut),
formatMsgs(c.otherMsgsOut), formatMsgs(msgAppsRes), formatMsgs(otherMsgsRes))
}
})
}
}

type testQuiescer struct {
numProposals int
status *raft.Status
Expand Down

0 comments on commit b67eb69

Please sign in to comment.