Skip to content

Commit

Permalink
raft: Prototype async log writes
Browse files Browse the repository at this point in the history
This commit prototypes a change to the etcd/raft library that allows raft logs
writes to be performed asynchronously from the state machine loop.

Instead of each `Ready` struct requiring immediate durability of the attached
entries before `Advance()`-ing, it requires a caller to eventually signal
durability of those entries through the use of a new `StableTo` API. Chunks of
log entries can be "in-flight" simultaneously and the state machine will only
act on them when they are signaled to be durably written to the local log.

In a sense, this change gives local disk I/O the same treatment that etcd/raft
gives network I/O. As a result, it moves towards an interface that allows the
state machine to run at a rate that is decoupled from disk and network I/O. The
remaining piece here is async log application, which may be less impactful for
performance (application is not durable) but should also be easier to support.

The commit does not make an effort to be backwards compatible.
  • Loading branch information
nvanbenschoten committed Aug 29, 2022
1 parent e51c697 commit 1d1fa32
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 27 deletions.
10 changes: 5 additions & 5 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raf
panic(err) // TODO(bdarnell)
}
log.unstable.offset = lastIndex + 1
log.unstable.inProgressOffset = log.unstable.offset
log.unstable.logger = logger
// Initialize our committed and applied pointers to the time of the last compaction.
log.committed = firstIndex - 1
Expand Down Expand Up @@ -168,10 +169,7 @@ func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 {
}

func (l *raftLog) unstableEntries() []pb.Entry {
if len(l.unstable.entries) == 0 {
return nil
}
return l.unstable.entries
return l.unstable.notAlreadyInProgress()
}

// nextEnts returns all the available entries for execution.
Expand Down Expand Up @@ -250,7 +248,9 @@ func (l *raftLog) appliedTo(i uint64) {
l.applied = i
}

func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
func (l *raftLog) inProgressTo(i, t uint64) { l.unstable.inProgressTo(i, t) }

func (l *raftLog) stableTo(i, t uint64) bool { return l.unstable.stableTo(i, t) }

func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }

Expand Down
34 changes: 31 additions & 3 deletions raft/log_unstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ type unstable struct {
// the incoming unstable snapshot, if any.
snapshot *pb.Snapshot
// all entries that have not yet been written to storage.
entries []pb.Entry
offset uint64
entries []pb.Entry
offset uint64
inProgressOffset uint64

logger Logger
}
Expand Down Expand Up @@ -72,19 +73,43 @@ func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
return u.entries[i-u.offset].Term, true
}

func (u *unstable) stableTo(i, t uint64) {
func (u *unstable) inProgressTo(i, t uint64) {
gt, ok := u.maybeTerm(i)
if !ok {
return
}
// if i < offset, term is matched with the snapshot
// only update the unstable entries if term is matched with
// an unstable entry.
if gt == t && i >= u.inProgressOffset {
u.inProgressOffset = i + 1
}
}

func (u *unstable) notAlreadyInProgress() []pb.Entry {
diff := int(u.inProgressOffset) - int(u.offset)
ents := u.entries[diff:]
if len(ents) == 0 {
return nil
}
return ents
}

func (u *unstable) stableTo(i, t uint64) bool {
gt, ok := u.maybeTerm(i)
if !ok {
return false
}
// if i < offset, term is matched with the snapshot
// only update the unstable entries if term is matched with
// an unstable entry.
if gt == t && i >= u.offset {
u.entries = u.entries[i+1-u.offset:]
u.offset = i + 1
u.inProgressOffset = max(u.inProgressOffset, u.offset)
u.shrinkEntriesArray()
}
return gt == t
}

// shrinkEntriesArray discards the underlying array used by the entries slice
Expand Down Expand Up @@ -114,6 +139,7 @@ func (u *unstable) stableSnapTo(i uint64) {

func (u *unstable) restore(s pb.Snapshot) {
u.offset = s.Metadata.Index + 1
u.inProgressOffset = u.offset
u.entries = nil
u.snapshot = &s
}
Expand All @@ -130,11 +156,13 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) {
// The log is being truncated to before our current offset
// portion, so set the offset and replace the entries
u.offset = after
u.inProgressOffset = u.offset
u.entries = ents
default:
// truncate to after and copy to u.entries
// then append
u.logger.Infof("truncate the unstable entries before index %d", after)
u.inProgressOffset = u.offset
u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
u.entries = append(u.entries, ents...)
}
Expand Down
12 changes: 5 additions & 7 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ type Ready struct {
// The returned is only valid for the request that requested to read.
ReadStates []ReadState

// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
// Entries specifies entries to be saved to stable storage.
Entries []pb.Entry

// Snapshot specifies the snapshot to be saved to stable storage.
Expand All @@ -78,14 +77,13 @@ type Ready struct {
// store.
CommittedEntries []pb.Entry

// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// Messages specifies outbound messages to be sent.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message

// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
// MustSync indicates whether the HardState must be synchronously written to
// disk or if an asynchronous write is permissible.
MustSync bool
}

Expand Down Expand Up @@ -584,5 +582,5 @@ func MustSync(st, prevst pb.HardState, entsnum int) bool {
// currentTerm
// votedFor
// log entries[]
return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
return st.Vote != prevst.Vote || st.Term != prevst.Term
}
29 changes: 23 additions & 6 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ type raft struct {

msgs []pb.Message

msgAppCurTerm bool

// the leader id
lead uint64
// leadTransferee is id of the leader transfer target when its value is not zero.
Expand Down Expand Up @@ -566,13 +568,26 @@ func (r *raft) advance(rd Ready) {

if len(rd.Entries) > 0 {
e := rd.Entries[len(rd.Entries)-1]
r.raftLog.stableTo(e.Index, e.Term)
r.raftLog.inProgressTo(e.Index, e.Term)
}
if !IsEmptySnap(rd.Snapshot) {
r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
}
}

func (r *raft) stableTo(e pb.Entry) {
if r.raftLog.stableTo(e.Index, e.Term) {
if r.lead == r.id {
r.prs.Progress[r.id].MaybeUpdate(e.Index)
if r.maybeCommit() {
r.bcastAppend()
}
} else if r.lead != None && r.msgAppCurTerm {
r.send(pb.Message{To: r.lead, Type: pb.MsgAppResp, Index: e.Index})
}
}
}

// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
Expand All @@ -587,6 +602,7 @@ func (r *raft) reset(term uint64) {
r.Vote = None
}
r.lead = None
r.msgAppCurTerm = false

r.electionElapsed = 0
r.heartbeatElapsed = 0
Expand Down Expand Up @@ -628,10 +644,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
return false
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.prs.Progress[r.id].MaybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
r.raftLog.append(es...)
return true
}

Expand Down Expand Up @@ -1475,7 +1488,11 @@ func (r *raft) handleAppendEntries(m pb.Message) {
}

if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
r.msgAppCurTerm = true
if len(r.raftLog.unstable.entries) == 0 {
// Already stable.
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
}
} else {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
Expand Down
9 changes: 9 additions & 0 deletions raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,15 @@ func (rn *RawNode) Advance(rd Ready) {
rn.raft.advance(rd)
}

// StableTo informs the RawNode that its log is durable up to the entries in the provided
// Ready.
func (rn *RawNode) StableTo(rd Ready) {
if len(rd.Entries) > 0 {
e := rd.Entries[len(rd.Entries)-1]
rn.raft.stableTo(e)
}
}

// Status returns the current status of the given group. This allocates, see
// BasicStatus and WithProgress for allocation-friendlier choices.
func (rn *RawNode) Status() Status {
Expand Down
Loading

0 comments on commit 1d1fa32

Please sign in to comment.