From 1d1fa32ec22715f6e21a6d6486f865356c3eb4fc Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Sun, 15 May 2022 19:42:54 -0400 Subject: [PATCH] raft: Prototype async log writes This commit prototypes a change to the etcd/raft library that allows raft logs writes to be performed asynchronously from the state machine loop. Instead of each `Ready` struct requiring immediate durability of the attached entries before `Advance()`-ing, it requires a caller to eventually signal durability of those entries through the use of a new `StableTo` API. Chunks of log entries can be "in-flight" simultaneously and the state machine will only act on them when they are signaled to be durably written to the local log. In a sense, this change gives local disk I/O the same treatment that etcd/raft gives network I/O. As a result, it moves towards an interface that allows the state machine to run at a rate that is decoupled from disk and network I/O. The remaining piece here is async log application, which may be less impactful for performance (application is not durable) but should also be easier to support. The commit does not make an effort to be backwards compatible. --- raft/log.go | 10 ++++---- raft/log_unstable.go | 34 ++++++++++++++++++++++--- raft/node.go | 12 ++++----- raft/raft.go | 29 ++++++++++++++++----- raft/rawnode.go | 9 +++++++ raft/rawnode_test.go | 60 +++++++++++++++++++++++++++++++++++++++----- 6 files changed, 127 insertions(+), 27 deletions(-) diff --git a/raft/log.go b/raft/log.go index c94c41f7783..91f41cc1e3b 100644 --- a/raft/log.go +++ b/raft/log.go @@ -71,6 +71,7 @@ func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raf panic(err) // TODO(bdarnell) } log.unstable.offset = lastIndex + 1 + log.unstable.inProgressOffset = log.unstable.offset log.unstable.logger = logger // Initialize our committed and applied pointers to the time of the last compaction. log.committed = firstIndex - 1 @@ -168,10 +169,7 @@ func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 { } func (l *raftLog) unstableEntries() []pb.Entry { - if len(l.unstable.entries) == 0 { - return nil - } - return l.unstable.entries + return l.unstable.notAlreadyInProgress() } // nextEnts returns all the available entries for execution. @@ -250,7 +248,9 @@ func (l *raftLog) appliedTo(i uint64) { l.applied = i } -func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) } +func (l *raftLog) inProgressTo(i, t uint64) { l.unstable.inProgressTo(i, t) } + +func (l *raftLog) stableTo(i, t uint64) bool { return l.unstable.stableTo(i, t) } func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) } diff --git a/raft/log_unstable.go b/raft/log_unstable.go index 230fd21f994..c0c62f17ac7 100644 --- a/raft/log_unstable.go +++ b/raft/log_unstable.go @@ -24,8 +24,9 @@ type unstable struct { // the incoming unstable snapshot, if any. snapshot *pb.Snapshot // all entries that have not yet been written to storage. - entries []pb.Entry - offset uint64 + entries []pb.Entry + offset uint64 + inProgressOffset uint64 logger Logger } @@ -72,7 +73,7 @@ func (u *unstable) maybeTerm(i uint64) (uint64, bool) { return u.entries[i-u.offset].Term, true } -func (u *unstable) stableTo(i, t uint64) { +func (u *unstable) inProgressTo(i, t uint64) { gt, ok := u.maybeTerm(i) if !ok { return @@ -80,11 +81,35 @@ func (u *unstable) stableTo(i, t uint64) { // if i < offset, term is matched with the snapshot // only update the unstable entries if term is matched with // an unstable entry. + if gt == t && i >= u.inProgressOffset { + u.inProgressOffset = i + 1 + } +} + +func (u *unstable) notAlreadyInProgress() []pb.Entry { + diff := int(u.inProgressOffset) - int(u.offset) + ents := u.entries[diff:] + if len(ents) == 0 { + return nil + } + return ents +} + +func (u *unstable) stableTo(i, t uint64) bool { + gt, ok := u.maybeTerm(i) + if !ok { + return false + } + // if i < offset, term is matched with the snapshot + // only update the unstable entries if term is matched with + // an unstable entry. if gt == t && i >= u.offset { u.entries = u.entries[i+1-u.offset:] u.offset = i + 1 + u.inProgressOffset = max(u.inProgressOffset, u.offset) u.shrinkEntriesArray() } + return gt == t } // shrinkEntriesArray discards the underlying array used by the entries slice @@ -114,6 +139,7 @@ func (u *unstable) stableSnapTo(i uint64) { func (u *unstable) restore(s pb.Snapshot) { u.offset = s.Metadata.Index + 1 + u.inProgressOffset = u.offset u.entries = nil u.snapshot = &s } @@ -130,11 +156,13 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) { // The log is being truncated to before our current offset // portion, so set the offset and replace the entries u.offset = after + u.inProgressOffset = u.offset u.entries = ents default: // truncate to after and copy to u.entries // then append u.logger.Infof("truncate the unstable entries before index %d", after) + u.inProgressOffset = u.offset u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...) u.entries = append(u.entries, ents...) } diff --git a/raft/node.go b/raft/node.go index 2a8f12ba416..050ce56cae4 100644 --- a/raft/node.go +++ b/raft/node.go @@ -66,8 +66,7 @@ type Ready struct { // The returned is only valid for the request that requested to read. ReadStates []ReadState - // Entries specifies entries to be saved to stable storage BEFORE - // Messages are sent. + // Entries specifies entries to be saved to stable storage. Entries []pb.Entry // Snapshot specifies the snapshot to be saved to stable storage. @@ -78,14 +77,13 @@ type Ready struct { // store. CommittedEntries []pb.Entry - // Messages specifies outbound messages to be sent AFTER Entries are - // committed to stable storage. + // Messages specifies outbound messages to be sent. // If it contains a MsgSnap message, the application MUST report back to raft // when the snapshot has been received or has failed by calling ReportSnapshot. Messages []pb.Message - // MustSync indicates whether the HardState and Entries must be synchronously - // written to disk or if an asynchronous write is permissible. + // MustSync indicates whether the HardState must be synchronously written to + // disk or if an asynchronous write is permissible. MustSync bool } @@ -584,5 +582,5 @@ func MustSync(st, prevst pb.HardState, entsnum int) bool { // currentTerm // votedFor // log entries[] - return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term + return st.Vote != prevst.Vote || st.Term != prevst.Term } diff --git a/raft/raft.go b/raft/raft.go index 73c3ca499d0..7a6a77024d6 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -263,6 +263,8 @@ type raft struct { msgs []pb.Message + msgAppCurTerm bool + // the leader id lead uint64 // leadTransferee is id of the leader transfer target when its value is not zero. @@ -566,13 +568,26 @@ func (r *raft) advance(rd Ready) { if len(rd.Entries) > 0 { e := rd.Entries[len(rd.Entries)-1] - r.raftLog.stableTo(e.Index, e.Term) + r.raftLog.inProgressTo(e.Index, e.Term) } if !IsEmptySnap(rd.Snapshot) { r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) } } +func (r *raft) stableTo(e pb.Entry) { + if r.raftLog.stableTo(e.Index, e.Term) { + if r.lead == r.id { + r.prs.Progress[r.id].MaybeUpdate(e.Index) + if r.maybeCommit() { + r.bcastAppend() + } + } else if r.lead != None && r.msgAppCurTerm { + r.send(pb.Message{To: r.lead, Type: pb.MsgAppResp, Index: e.Index}) + } + } +} + // maybeCommit attempts to advance the commit index. Returns true if // the commit index changed (in which case the caller should call // r.bcastAppend). @@ -587,6 +602,7 @@ func (r *raft) reset(term uint64) { r.Vote = None } r.lead = None + r.msgAppCurTerm = false r.electionElapsed = 0 r.heartbeatElapsed = 0 @@ -628,10 +644,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { return false } // use latest "last" index after truncate/append - li = r.raftLog.append(es...) - r.prs.Progress[r.id].MaybeUpdate(li) - // Regardless of maybeCommit's return, our caller will call bcastAppend. - r.maybeCommit() + r.raftLog.append(es...) return true } @@ -1475,7 +1488,11 @@ func (r *raft) handleAppendEntries(m pb.Message) { } if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) + r.msgAppCurTerm = true + if len(r.raftLog.unstable.entries) == 0 { + // Already stable. + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) + } } else { r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) diff --git a/raft/rawnode.go b/raft/rawnode.go index 4111d029dd6..9bfd70307cb 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -178,6 +178,15 @@ func (rn *RawNode) Advance(rd Ready) { rn.raft.advance(rd) } +// StableTo informs the RawNode that its log is durable up to the entries in the provided +// Ready. +func (rn *RawNode) StableTo(rd Ready) { + if len(rd.Entries) > 0 { + e := rd.Entries[len(rd.Entries)-1] + rn.raft.stableTo(e) + } +} + // Status returns the current status of the given group. This allocates, see // BasicStatus and WithProgress for allocation-friendlier choices. func (rn *RawNode) Status() Status { diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 898b0f12c3e..7dcef27b0cc 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -261,6 +261,7 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { } } rawNode.Advance(rd) + rawNode.StableTo(rd) // Once we are the leader, propose a command and a ConfChange. if !proposed && rd.SoftState.Lead == rawNode.raft.id { if err = rawNode.Propose([]byte("somedata")); err != nil { @@ -423,6 +424,7 @@ func TestRawNodeJointAutoLeave(t *testing.T) { } } rawNode.Advance(rd) + rawNode.StableTo(rd) // Once we are the leader, propose a command and a ConfChange. if !proposed && rd.SoftState.Lead == rawNode.raft.id { if err = rawNode.Propose([]byte("somedata")); err != nil { @@ -483,6 +485,13 @@ func TestRawNodeJointAutoLeave(t *testing.T) { rd = rawNode.Ready() s.Append(rd.Entries) rawNode.Advance(rd) + rawNode.StableTo(rd) + + rd = rawNode.Ready() + s.Append(rd.Entries) + rawNode.Advance(rd) + rawNode.StableTo(rd) + rd = rawNode.Ready() s.Append(rd.Entries) @@ -517,6 +526,7 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { rd := rawNode.Ready() s.Append(rd.Entries) rawNode.Advance(rd) + rawNode.StableTo(rd) rawNode.Campaign() for { @@ -524,9 +534,11 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { s.Append(rd.Entries) if rd.SoftState.Lead == rawNode.raft.id { rawNode.Advance(rd) + rawNode.StableTo(rd) break } rawNode.Advance(rd) + rawNode.StableTo(rd) } proposeConfChangeAndApply := func(cc pb.ConfChange) { @@ -541,6 +553,7 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { } } rawNode.Advance(rd) + rawNode.StableTo(rd) } cc1 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1} @@ -610,6 +623,7 @@ func TestRawNodeReadIndex(t *testing.T) { } s.Append(rd.Entries) rawNode.Advance(rd) + rawNode.StableTo(rd) // ensure raft.readStates is reset after advance if rawNode.raft.readStates != nil { t.Errorf("readStates = %v, want %v", rawNode.raft.readStates, nil) @@ -623,6 +637,7 @@ func TestRawNodeReadIndex(t *testing.T) { if rd.SoftState.Lead == rawNode.raft.id { rawNode.Advance(rd) + rawNode.StableTo(rd) // Once we are the leader, issue a ReadIndex request rawNode.raft.step = appendStep @@ -630,6 +645,7 @@ func TestRawNodeReadIndex(t *testing.T) { break } rawNode.Advance(rd) + rawNode.StableTo(rd) } // ensure that MsgReadIndex message is sent to the underlying raft if len(msgs) != 1 { @@ -656,18 +672,22 @@ func TestRawNodeReadIndex(t *testing.T) { // requires the application to bootstrap the state, i.e. it does not accept peers // and will not create faux configuration change entries. func TestRawNodeStart(t *testing.T) { - want := Ready{ + wantFirst := Ready{ SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, - HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1}, + HardState: pb.HardState{Term: 1, Commit: 1, Vote: 1}, Entries: []pb.Entry{ {Term: 1, Index: 2, Data: nil}, // empty entry {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry }, + MustSync: true, + } + wantSecond := Ready{ + SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, + HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1}, CommittedEntries: []pb.Entry{ {Term: 1, Index: 2, Data: nil}, // empty entry {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry }, - MustSync: true, } storage := NewMemoryStorage() @@ -749,11 +769,23 @@ func TestRawNodeStart(t *testing.T) { rd := rawNode.Ready() storage.Append(rd.Entries) rawNode.Advance(rd) + rawNode.StableTo(rd) - rd.SoftState, want.SoftState = nil, nil + rd.SoftState, wantFirst.SoftState = nil, nil - if !reflect.DeepEqual(rd, want) { - t.Fatalf("unexpected Ready:\n%+v\nvs\n%+v", rd, want) + if !reflect.DeepEqual(rd, wantFirst) { + t.Fatalf("unexpected Ready:\n%+v\nvs\n%+v", rd, wantFirst) + } + + rd = rawNode.Ready() + storage.Append(rd.Entries) + rawNode.Advance(rd) + rawNode.StableTo(rd) + + rd.SoftState, wantSecond.SoftState = nil, nil + + if !reflect.DeepEqual(rd, wantSecond) { + t.Fatalf("unexpected Ready:\n%+v\nvs\n%+v", rd, wantSecond) } if rawNode.HasReady() { @@ -787,6 +819,7 @@ func TestRawNodeRestart(t *testing.T) { t.Errorf("g = %+v,\n w %+v", rd, want) } rawNode.Advance(rd) + rawNode.StableTo(rd) if rawNode.HasReady() { t.Errorf("unexpected Ready: %+v", rawNode.Ready()) } @@ -824,6 +857,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) { t.Errorf("g = %+v,\n w %+v", rd, want) } else { rawNode.Advance(rd) + rawNode.StableTo(rd) } if rawNode.HasReady() { t.Errorf("unexpected Ready: %+v", rawNode.HasReady()) @@ -933,6 +967,7 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { } highestApplied = rd.CommittedEntries[n-1].Index rawNode.Advance(rd) + rawNode.StableTo(rd) rawNode.Step(pb.Message{ Type: pb.MsgHeartbeat, To: 1, @@ -963,6 +998,7 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { rd := rawNode.Ready() s.Append(rd.Entries) rawNode.Advance(rd) + rawNode.StableTo(rd) // Become the leader. rawNode.Campaign() @@ -971,9 +1007,11 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { s.Append(rd.Entries) if rd.SoftState.Lead == rawNode.raft.id { rawNode.Advance(rd) + rawNode.StableTo(rd) break } rawNode.Advance(rd) + rawNode.StableTo(rd) } // Simulate a network partition while we make our proposals by never @@ -995,12 +1033,21 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { // Recover from the partition. The uncommitted tail of the Raft log should // disappear as entries are committed. + rd = rawNode.Ready() + if len(rd.CommittedEntries) != 1 { + t.Fatalf("expected %d entries, got %d", 1, len(rd.CommittedEntries)) + } + s.Append(rd.Entries) + rawNode.Advance(rd) + rawNode.StableTo(rd) + rd = rawNode.Ready() if len(rd.CommittedEntries) != maxEntries { t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries)) } s.Append(rd.Entries) rawNode.Advance(rd) + rawNode.StableTo(rd) checkUncommitted(0) } @@ -1101,6 +1148,7 @@ func TestRawNodeConsumeReady(t *testing.T) { // Add a message to raft to make sure that Advance() doesn't drop it. rn.raft.msgs = append(rn.raft.msgs, m2) rn.Advance(rd) + rn.StableTo(rd) if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m2) { t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs) }