diff --git a/doc.go b/doc.go index 4febfe60..06253f4e 100644 --- a/doc.go +++ b/doc.go @@ -169,6 +169,94 @@ given ID MUST be used only once even if the old node has been removed. This means that for example IP addresses make poor node IDs since they may be reused. Node IDs must be non-zero. +# Usage with Asynchronous Storage Writes + +The library can be configured with an alternate interface for local storage +writes that can provide better performance in the presence of high proposal +concurrency by minimizing interference between proposals. This feature is called +AsynchronousStorageWrites, and can be enabled using the flag on the Config +struct with the same name. + +When Asynchronous Storage Writes is enabled, the responsibility of code using +the library is different from what was presented above. Users still read from +the Node.Ready() channel. However, they process the updates it contains in a +different manner. Users no longer consult the HardState, Entries, and Snapshot +fields (steps 1 and 3 above). They also no longer call Node.Advance() to +indicate that they have processed all entries in the Ready (step 4 above). +Instead, all local storage operations are also communicated through messages +present in the Ready.Message slice. + +The local storage messages come in two flavors. The first flavor is log append +messages, which target a LocalAppendThread and carry Entries, HardState, and a +Snapshot. The second flavor is entry application messages, which target a +LocalApplyThread and carry CommittedEntries. Messages to the same target must be +reliably processed in order. Messages to different targets can be processed in +any order. + +Each local storage message carries a slice of response messages that must +delivered after the corresponding storage write has been completed. These +responses may target the same node or may target other nodes. + +With Asynchronous Storage Writes enabled, the total state machine handling loop +will look something like this: + + for { + select { + case <-s.Ticker: + n.Tick() + case rd := <-s.Node.Ready(): + for _, m := range rd.Messages { + switch m.To { + case raft.LocalAppendThread: + toAppend <- m + case raft.LocalApplyThread: + toApply <-m + default: + sendOverNetwork(m) + } + } + case <-s.done: + return + } + } + +Usage of Asynchronous Storage Writes will typically also contain a pair of +storage handler threads, one for log writes (append) and one for entry +application to the local state machine (apply). Those will look something like: + + // append thread + go func() { + for { + select { + case m := <-toAppend: + saveToStorage(m.State, m.Entries, m.Snapshot) + send(m.Responses) + case <-s.done: + return + } + } + } + + // apply thread + go func() { + for { + select { + case m := <-toApply: + for _, entry := range m.CommittedEntries { + process(entry) + if entry.Type == raftpb.EntryConfChange { + var cc raftpb.ConfChange + cc.Unmarshal(entry.Data) + s.Node.ApplyConfChange(cc) + } + } + send(m.Responses) + case <-s.done: + return + } + } + } + # Implementation notes This implementation is up to date with the final Raft thesis @@ -295,5 +383,17 @@ stale log entries: that the follower that sent this 'MsgUnreachable' is not reachable, often indicating 'MsgApp' is lost. When follower's progress state is replicate, the leader sets it back to probe. + + 'MsgStorageAppend' is a message from a node to its local append storage + thread to write entries, hard state, and/or a snapshot to stable storage. + The message will carry one or more responses, one of which will be a + 'MsgStorageAppendResp' back to itself. The responses can also contain + 'MsgAppResp', 'MsgVoteResp', and 'MsgPreVoteResp' messages. Used with + AsynchronousStorageWrites. + + 'MsgStorageApply' is a message from a node to its local apply storage + thread to apply committed entries. The message will carry one response, + which will be a 'MsgStorageApplyResp' back to itself. Used with + AsynchronousStorageWrites. */ package raft diff --git a/log.go b/log.go index 3aad898c..82efda40 100644 --- a/log.go +++ b/log.go @@ -191,6 +191,13 @@ func (l *raftLog) hasNextUnstableEnts() bool { return len(l.nextUnstableEnts()) > 0 } +// hasNextOrInProgressUnstableEnts returns if there are any entries that are +// available to be written to the local stable log or in the process of being +// written to the local stable log. +func (l *raftLog) hasNextOrInProgressUnstableEnts() bool { + return len(l.unstable.entries) > 0 +} + // nextCommittedEnts returns all the available entries for execution. // Entries can be committed even when the local raft instance has not durably // appended them to the local raft log yet. If allowUnstable is true, committed diff --git a/log_unstable.go b/log_unstable.go index 5caa433e..6e4298de 100644 --- a/log_unstable.go +++ b/log_unstable.go @@ -127,6 +127,10 @@ func (u *unstable) acceptInProgress() { // stableTo marks entries up to the entry with the specified (index, term) as // being successfully written to stable storage. +// +// The method should only be called when the caller can attest that the entries +// can not be overwritten by an in-progress log append. See the related comment +// in newStorageAppendRespMsg. func (u *unstable) stableTo(i, t uint64) { gt, ok := u.maybeTerm(i) if !ok { diff --git a/node.go b/node.go index 8ef1ae05..5afeb44d 100644 --- a/node.go +++ b/node.go @@ -57,7 +57,12 @@ type Ready struct { // The current state of a Node to be saved to stable storage BEFORE // Messages are sent. + // // HardState will be equal to empty state if there is no update. + // + // If async storage writes are enabled, this field does not need to be acted + // on immediately. It will be reflected in a MsgStorageAppend message in the + // Messages slice. pb.HardState // ReadStates can be used for node to serve linearizable read requests locally @@ -68,24 +73,44 @@ type Ready struct { // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. + // + // If async storage writes are enabled, this field does not need to be acted + // on immediately. It will be reflected in a MsgStorageAppend message in the + // Messages slice. Entries []pb.Entry // Snapshot specifies the snapshot to be saved to stable storage. + // + // If async storage writes are enabled, this field does not need to be acted + // on immediately. It will be reflected in a MsgStorageAppend message in the + // Messages slice. Snapshot pb.Snapshot // CommittedEntries specifies entries to be committed to a - // store/state-machine. These have previously been committed to stable - // store. + // store/state-machine. These have previously been appended to stable + // storage. + // + // If async storage writes are enabled, this field does not need to be acted + // on immediately. It will be reflected in a MsgStorageApply message in the + // Messages slice. CommittedEntries []pb.Entry - // Messages specifies outbound messages to be sent AFTER Entries are - // committed to stable storage. + // Messages specifies outbound messages. + // + // If async storage writes are not enabled, these messages must be sent + // AFTER Entries are appended to stable storage. + // + // If async storage writes are enabled, these messages can be sent + // immediately as the messages that have the completion of the async writes + // as a precondition are attached to the individual MsgStorage{Append,Apply} + // messages instead. + // // If it contains a MsgSnap message, the application MUST report back to raft // when the snapshot has been received or has failed by calling ReportSnapshot. Messages []pb.Message - // MustSync indicates whether the HardState and Entries must be synchronously - // written to disk or if an asynchronous write is permissible. + // MustSync indicates whether the HardState and Entries must be durably + // written to disk or if a non-durable write is permissible. MustSync bool } @@ -103,19 +128,6 @@ func IsEmptySnap(sp pb.Snapshot) bool { return sp.Metadata.Index == 0 } -// appliedCursor extracts from the Ready the highest index the client has -// applied (once the Ready is confirmed via Advance). If no information is -// contained in the Ready, returns zero. -func (rd Ready) appliedCursor() uint64 { - if n := len(rd.CommittedEntries); n > 0 { - return rd.CommittedEntries[n-1].Index - } - if index := rd.Snapshot.Metadata.Index; index > 0 { - return index - } - return 0 -} - // Node represents a node in a raft cluster. type Node interface { // Tick increments the internal logical clock for the Node by a single tick. Election @@ -144,7 +156,8 @@ type Node interface { Step(ctx context.Context, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state. - // Users of the Node must call Advance after retrieving the state returned by Ready. + // Users of the Node must call Advance after retrieving the state returned by Ready (unless + // async storage writes is enabled, in which case it should never be called). // // NOTE: No committed entries from the next Ready may be applied until all committed entries // and snapshots from the previous one have finished. @@ -159,6 +172,9 @@ type Node interface { // commands. For example. when the last Ready contains a snapshot, the application might take // a long time to apply the snapshot data. To continue receiving Ready without blocking raft // progress, it can call Advance before finishing applying the last ready. + // + // NOTE: Advance must not be called when using AsyncStorageWrites. Response messages from the + // local append and apply threads take its place. Advance() // ApplyConfChange applies a config change (previously passed to // ProposeConfChange) to the node. This must be called whenever a config @@ -309,9 +325,7 @@ func (n *node) run() { lead := None for { - if advancec != nil { - readyc = nil - } else if n.rn.HasReady() { + if advancec == nil && n.rn.HasReady() { // Populate a Ready. Note that this Ready is not guaranteed to // actually be handled. We will arm readyc, but there's no guarantee // that we will actually send on it. It's possible that we will @@ -352,10 +366,11 @@ func (n *node) run() { close(pm.result) } case m := <-n.recvc: - // filter out response message from unknown From. - if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) { - r.Step(m) + if IsResponseMsg(m.Type) && !IsLocalMsgTarget(m.From) && r.prs.Progress[m.From] == nil { + // Filter out response message from unknown From. + break } + r.Step(m) case cc := <-n.confc: _, okBefore := r.prs.Progress[r.id] cs := r.applyConfChange(cc) @@ -393,7 +408,12 @@ func (n *node) run() { n.rn.Tick() case readyc <- rd: n.rn.acceptReady(rd) - advancec = n.advancec + if !n.rn.raft.asyncStorageWrites { + advancec = n.advancec + } else { + rd = Ready{} + } + readyc = nil case <-advancec: n.rn.Advance(rd) rd = Ready{} @@ -425,8 +445,8 @@ func (n *node) Propose(ctx context.Context, data []byte) error { } func (n *node) Step(ctx context.Context, m pb.Message) error { - // ignore unexpected local messages receiving over network - if IsLocalMsg(m.Type) { + // Ignore unexpected local messages receiving over network. + if IsLocalMsg(m.Type) && !IsLocalMsgTarget(m.From) { // TODO: return an error? return nil } @@ -562,7 +582,7 @@ func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { rd := Ready{ Entries: r.raftLog.nextUnstableEnts(), - CommittedEntries: r.raftLog.nextCommittedEnts(true /* allowUnstable */), + CommittedEntries: r.raftLog.nextCommittedEnts(!r.asyncStorageWrites), Messages: r.msgs, } if softSt := r.softState(); !softSt.equal(prevSoftSt) { @@ -578,6 +598,30 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { rd.ReadStates = r.readStates } rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries)) + + if r.asyncStorageWrites { + // If async storage writes are enabled, enqueue messages to + // local storage threads, where applicable. + if needStorageAppend(rd, len(r.msgsAfterAppend) > 0) { + m := newStorageAppendMsg(r, rd) + rd.Messages = append(rd.Messages, m) + } + if needStorageApply(rd) { + m := newStorageApplyMsg(r, rd) + rd.Messages = append(rd.Messages, m) + } + } else { + // If async storage writes are disabled, immediately enqueue + // msgsAfterAppend to be sent out. The Ready struct contract + // mandates that Messages cannot be sent until after Entries + // are written to stable storage. + for _, m := range r.msgsAfterAppend { + if m.To != r.id { + rd.Messages = append(rd.Messages, m) + } + } + } + return rd } @@ -591,3 +635,184 @@ func MustSync(st, prevst pb.HardState, entsnum int) bool { // log entries[] return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term } + +func needStorageAppend(rd Ready, haveMsgsAfterAppend bool) bool { + // Return true if log entries, hard state, or a snapshot need to be written + // to stable storage. Also return true if any messages are contingent on all + // prior MsgStorageAppend being processed. + return len(rd.Entries) > 0 || + !IsEmptyHardState(rd.HardState) || + !IsEmptySnap(rd.Snapshot) || + haveMsgsAfterAppend +} + +// newStorageAppendMsg creates the message that should be sent to the local +// append thread to instruct it to append log entries, write an updated hard +// state, and apply a snapshot. The message also carries a set of responses +// that should be delivered after the rest of the message is processed. Used +// with AsyncStorageWrites. +func newStorageAppendMsg(r *raft, rd Ready) pb.Message { + m := pb.Message{ + Type: pb.MsgStorageAppend, + To: LocalAppendThread, + From: r.id, + Term: r.Term, + Entries: rd.Entries, + } + if !IsEmptyHardState(rd.HardState) { + hs := rd.HardState + m.HardState = &hs + } + if !IsEmptySnap(rd.Snapshot) { + snap := rd.Snapshot + m.Snapshot = &snap + } + // Attach all messages in msgsAfterAppend as responses to be delivered after + // the message is processed, along with a self-directed MsgStorageAppendResp + // to acknowledge the entry stability. + // + // NB: it is important for performance that MsgStorageAppendResp message be + // handled after self-directed MsgAppResp messages on the leader (which will + // be contained in msgsAfterAppend). This ordering allows the MsgAppResp + // handling to use a fast-path in r.raftLog.term() before the newly appended + // entries are removed from the unstable log. + m.Responses = r.msgsAfterAppend + m.Responses = append(m.Responses, newStorageAppendRespMsg(r, rd)) + return m +} + +// newStorageAppendRespMsg creates the message that should be returned to node +// after the unstable log entries, hard state, and snapshot in the current Ready +// (along with those in all prior Ready structs) have been saved to stable +// storage. +func newStorageAppendRespMsg(r *raft, rd Ready) pb.Message { + m := pb.Message{ + Type: pb.MsgStorageAppendResp, + To: r.id, + From: LocalAppendThread, + // Dropped after term change, see below. + Term: r.Term, + } + if r.raftLog.hasNextOrInProgressUnstableEnts() { + // If the raft log has unstable entries, attach the last index and term to the + // response message. This (index, term) tuple will be handed back and consulted + // when the stability of those log entries is signaled to the unstable. If the + // (index, term) match the unstable log by the time the response is received, + // the unstable log can be truncated. + // + // However, with just this logic, there would be an ABA problem that could lead + // to the unstable log and the stable log getting out of sync temporarily and + // leading to an inconsistent view. Consider the following example with 5 nodes, + // A B C D E: + // + // 1. A is the leader. + // 2. A proposes some log entries but only B receives these entries. + // 3. B gets the Ready and the entries are appended asynchronously. + // 4. A crashes and C becomes leader after getting a vote from D and E. + // 5. C proposes some log entries and B receives these entries, overwriting the + // previous unstable log entries that are in the process of being appended. + // The entries have a larger term than the previous entries but the same + // indexes. It begins appending these new entries asynchronously. + // 6. C crashes and A restarts and becomes leader again after getting the vote + // from D and E. + // 7. B receives the entries from A which are the same as the ones from step 2, + // overwriting the previous unstable log entries that are in the process of + // being appended from step 5. The entries have the original terms and + // indexes from step 2. Recall that log entries retain their original term + // numbers when a leader replicates entries from previous terms. It begins + // appending these new entries asynchronously. + // 8. The asynchronous log appends from the first Ready complete and stableTo + // is called. + // 9. However, the log entries from the second Ready are still in the + // asynchronous append pipeline and will overwrite (in stable storage) the + // entries from the first Ready at some future point. We can't truncate the + // unstable log yet or a future read from Storage might see the entries from + // step 5 before they have been replaced by the entries from step 7. + // Instead, we must wait until we are sure that the entries are stable and + // that no in-progress appends might overwrite them before removing entries + // from the unstable log. + // + // To prevent these kinds of problems, we also attach the current term to the + // MsgStorageAppendResp (above). If the term has changed by the time the + // MsgStorageAppendResp if returned, the response is ignored and the unstable + // log is not truncated. The unstable log is only truncated when the term has + // remained unchanged from the time that the MsgStorageAppend was sent to the + // time that the MsgStorageAppendResp is received, indicating that no-one else + // is in the process of truncating the stable log. + // + // However, this replaces a correctness problem with a liveness problem. If we + // only attempted to truncate the unstable log when appending new entries but + // also occasionally dropped these responses, then quiescence of new log entries + // could lead to the unstable log never being truncated. + // + // To combat this, we attempt to truncate the log on all MsgStorageAppendResp + // messages where the unstable log is not empty, not just those associated with + // entry appends. This includes MsgStorageAppendResp messages associated with an + // updated HardState, which occur after a term change. + // + // In other words, we set Index and LogTerm in a block that looks like: + // + // if r.raftLog.hasNextOrInProgressUnstableEnts() { ... } + // + // not like: + // + // if len(rd.Entries) > 0 { ... } + // + // To do so, we attach r.raftLog.lastIndex() and r.raftLog.lastTerm(), not the + // (index, term) of the last entry in rd.Entries. If rd.Entries is not empty, + // these will be the same. However, if rd.Entries is empty, we still want to + // attest that this (index, term) is correct at the current term, in case the + // MsgStorageAppend that contained the last entry in the unstable slice carried + // an earlier term and was dropped. + // TODO(nvanbenschoten): test this behavior in a data-driven test. + m.Index = r.raftLog.lastIndex() + m.LogTerm = r.raftLog.lastTerm() + } + if !IsEmptySnap(rd.Snapshot) { + snap := rd.Snapshot + m.Snapshot = &snap + } + return m +} + +func needStorageApply(rd Ready) bool { + return len(rd.CommittedEntries) > 0 +} + +// newStorageApplyMsg creates the message that should be sent to the local +// apply thread to instruct it to apply committed log entries. The message +// also carries a response that should be delivered after the rest of the +// message is processed. Used with AsyncStorageWrites. +func newStorageApplyMsg(r *raft, rd Ready) pb.Message { + ents := rd.CommittedEntries + last := ents[len(ents)-1].Index + return pb.Message{ + Type: pb.MsgStorageApply, + To: LocalApplyThread, + From: r.id, + Term: 0, // committed entries don't apply under a specific term + Entries: ents, + Index: last, + Responses: []pb.Message{ + newStorageApplyRespMsg(r, ents), + }, + } +} + +// newStorageApplyRespMsg creates the message that should be returned to node +// after the committed entries in the current Ready (along with those in all +// prior Ready structs) have been applied to the local state machine. +func newStorageApplyRespMsg(r *raft, committedEnts []pb.Entry) pb.Message { + last := committedEnts[len(committedEnts)-1].Index + size := r.getUncommittedSize(committedEnts) + return pb.Message{ + Type: pb.MsgStorageApplyResp, + To: r.id, + From: LocalApplyThread, + Term: 0, // committed entries don't apply under a specific term + Index: last, + // NOTE: we abuse the LogTerm field to store the aggregate entry size so + // that we don't need to introduce a new field on Message. + LogTerm: size, + } +} diff --git a/raft.go b/raft.go index e279baeb..33b058bc 100644 --- a/raft.go +++ b/raft.go @@ -31,9 +31,18 @@ import ( "go.etcd.io/raft/v3/tracker" ) -// None is a placeholder node ID used when there is no leader. -const None uint64 = 0 -const noLimit = math.MaxUint64 +const ( + // None is a placeholder node ID used when there is no leader. + None uint64 = 0 + // LocalAppendThread is a reference to a local thread that saves unstable + // log entries and snapshots to stable storage. The identifier is used as a + // target for MsgStorageAppend messages when AsyncStorageWrites is enabled. + LocalAppendThread uint64 = math.MaxUint64 + // LocalApplyThread is a reference to a local thread that applies committed + // log entries to the local state machine. The identifier is used as a + // target for MsgStorageApply messages when AsyncStorageWrites is enabled. + LocalApplyThread uint64 = math.MaxUint64 - 1 +) // Possible values for StateType. const ( @@ -70,6 +79,8 @@ const ( campaignTransfer CampaignType = "CampaignTransfer" ) +const noLimit = math.MaxUint64 + // ErrProposalDropped is returned when the proposal is ignored by some cases, // so that the proposer can be notified and fail fast. var ErrProposalDropped = errors.New("raft proposal dropped") @@ -140,6 +151,42 @@ type Config struct { // applied entries. This is a very application dependent configuration. Applied uint64 + // AsyncStorageWrites configures the raft node to write to its local storage + // (raft log and state machine) using a request/response message passing + // interface instead of the default Ready/Advance function call interface. + // Local storage messages can be pipelined and processed asynchronously + // (with respect to Ready iteration), facilitating reduced interference + // between Raft proposals and increased batching of log appends and state + // machine application. As a result, use of asynchronous storage writes can + // reduce end-to-end commit latency and increase maximum throughput. + // + // When true, the Ready.Message slice will include MsgStorageAppend and + // MsgStorageApply messages. The messages will target a LocalAppendThread + // and a LocalApplyThread, respectively. Messages to the same target must be + // reliably processed in order. In other words, they can't be dropped (like + // messages over the network) and those targeted at the same thread can't be + // reordered. Messages to different targets can be processed in any order. + // + // MsgStorageAppend carries Raft log entries to append, election votes to + // persist, and snapshots to apply. All writes performed in service of a + // MsgStorageAppend must be durable before response messages are delivered. + // However, if the MsgStorageAppend carries no response messages, durability + // is not required. The message assumes the role of the Entries, HardState, + // and Snapshot fields in Ready. + // + // MsgStorageApply carries committed entries to apply. Writes performed in + // service of a MsgStorageApply need not be durable before response messages + // are delivered. The message assumes the role of the CommittedEntries field + // in Ready. + // + // Local messages each carry one or more response messages which should be + // delivered after the corresponding storage write has been completed. These + // responses may target the same node or may target other nodes. The storage + // threads are not responsible for understanding the response messages, only + // for delivering them to the correct target after performing the storage + // write. + AsyncStorageWrites bool + // MaxSizePerMsg limits the max byte size of each append message. Smaller // value lowers the raft recovery cost(initial probing and message lost // during normal operation). On the other side, it might affect the @@ -212,6 +259,9 @@ func (c *Config) validate() error { if c.ID == None { return errors.New("cannot use none as id") } + if IsLocalMsgTarget(c.ID) { + return errors.New("cannot use local target as id") + } if c.HeartbeatTick <= 0 { return errors.New("heartbeat tick must be greater than 0") @@ -276,15 +326,22 @@ type raft struct { // isLearner is true if the local raft node is a learner. isLearner bool + // msgs contains the list of messages that should be sent out immediately to + // other nodes. + // + // Messages in this list must target other nodes. msgs []pb.Message - - // voteSelfOnAdvance is a marker that the local raft node should vote for - // itself upon its next call to advance. This is not meant to be the final - // approach towards handling self-votes, but it's a useful intermediate - // point to get all tests working and to write some additional tests that - // demonstrate possible race conditions when self-voting is asynchronous. - // This is replaced in a later commit. - voteSelfOnAdvance pb.Message + // msgsAfterAppend contains the list of messages that should be sent after + // the accumulated unstable state (e.g. term, vote, []entry, and snapshot) + // has been persisted to durable storage. This includes waiting for any + // unstable state that is already in the process of being persisted (i.e. + // has already been handed out in a prior Ready struct) to complete. + // + // Messages in this list may target other nodes or may target this node. + // + // Messages in this list have the type MsgAppResp, MsgVoteResp, or + // MsgPreVoteResp. See the comment in raft.send for details. + msgsAfterAppend []pb.Message // the leader id lead uint64 @@ -315,8 +372,9 @@ type raft struct { // only leader keeps heartbeatElapsed. heartbeatElapsed int - checkQuorum bool - preVote bool + asyncStorageWrites bool + checkQuorum bool + preVote bool heartbeatTimeout int electionTimeout int @@ -359,6 +417,7 @@ func newRaft(c *Config) *raft { electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, + asyncStorageWrites: c.AsyncStorageWrites, checkQuorum: c.CheckQuorum, preVote: c.PreVote, readOnly: newReadOnly(c.ReadOnlyOption), @@ -424,11 +483,11 @@ func (r *raft) send(m pb.Message) { // - MsgPreVoteResp: m.Term is the term received in the original // MsgPreVote if the pre-vote was granted, non-zero for the // same reasons MsgPreVote is - panic(fmt.Sprintf("term should be set when sending %s", m.Type)) + r.logger.Panicf("term should be set when sending %s", m.Type) } } else { if m.Term != 0 { - panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term)) + r.logger.Panicf("term should not be set when sending %s (was %d)", m.Type, m.Term) } // do not attach term to MsgProp, MsgReadIndex // proposals are a way to forward to the leader and @@ -438,10 +497,59 @@ func (r *raft) send(m pb.Message) { m.Term = r.Term } } - if m.To == r.id { - r.logger.Panicf("message should not be self-addressed when sending %s", m.Type) + if m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVoteResp { + // If async storage writes are enabled, messages added to the msgs slice + // are allowed to be sent out before unstable state (e.g. log entry + // writes and election votes) have been durably synced to the local + // disk. + // + // For most message types, this is not an issue. However, response + // messages that relate to "voting" on either leader election or log + // appends require durability before they can be sent. It would be + // incorrect to publish a vote in an election before that vote has been + // synced to stable storage locally. Similarly, it would be incorrect to + // acknowledge a log append to the leader before that entry has been + // synced to stable storage locally. + // + // Per the Raft thesis, section 3.8 Persisted state and server restarts: + // + // > Raft servers must persist enough information to stable storage to + // > survive server restarts safely. In particular, each server persists + // > its current term and vote; this is necessary to prevent the server + // > from voting twice in the same term or replacing log entries from a + // > newer leader with those from a deposed leader. Each server also + // > persists new log entries before they are counted towards the entries’ + // > commitment; this prevents committed entries from being lost or + // > “uncommitted” when servers restart + // + // To enforce this durability requirement, these response messages are + // queued to be sent out as soon as the current collection of unstable + // state (the state that the response message was predicated upon) has + // been durably persisted. This unstable state may have already been + // passed to a Ready struct whose persistence is in progress or may be + // waiting for the next Ready struct to begin being written to Storage. + // These messages must wait for all of this state to be durable before + // being published. + // + // Rejected responses (m.Reject == true) present an interesting case + // where the durability requirement is less unambiguous. A rejection may + // be predicated upon unstable state. For instance, a node may reject a + // vote for one peer because it has already begun syncing its vote for + // another peer. Or it may reject a vote from one peer because it has + // unstable log entries that indicate that the peer is behind on its + // log. In these cases, it is likely safe to send out the rejection + // response immediately without compromising safety in the presence of a + // server restart. However, because these rejections are rare and + // because the safety of such behavior has not been formally verified, + // we err on the side of safety and omit a `&& !m.Reject` condition + // above. + r.msgsAfterAppend = append(r.msgsAfterAppend, m) + } else { + if m.To == r.id { + r.logger.Panicf("message should not be self-addressed when sending %s", m.Type) + } + r.msgs = append(r.msgs, m) } - r.msgs = append(r.msgs, m) } // sendAppend sends an append RPC with new entries (if any) and the @@ -570,61 +678,39 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { }) } -func (r *raft) advance(rd Ready) { - r.reduceUncommittedSize(rd.CommittedEntries) - - // If entries were applied (or a snapshot), update our cursor for - // the next Ready. Note that if the current HardState contains a - // new Commit index, this does not mean that we're also applying - // all of the new entries due to commit pagination by size. - if newApplied := rd.appliedCursor(); newApplied > 0 { - r.raftLog.appliedTo(newApplied) - - if r.prs.Config.AutoLeave && newApplied >= r.pendingConfIndex && r.state == StateLeader { - // If the current (and most recent, at least for this leader's term) - // configuration should be auto-left, initiate that now. We use a - // nil Data which unmarshals into an empty ConfChangeV2 and has the - // benefit that appendEntry can never refuse it based on its size - // (which registers as zero). - m, err := confChangeToMsg(nil) - if err != nil { - panic(err) - } - // NB: this proposal can't be dropped due to size, but can be - // dropped if a leadership transfer is in progress. We'll keep - // checking this condition on each applied entry, so either the - // leadership transfer will succeed and the new leader will leave - // the joint configuration, or the leadership transfer will fail, - // and we will propose the config change on the next advance. - if err := r.Step(m); err != nil { - r.logger.Debugf("not initiating automatic transition out of joint configuration %s: %v", r.prs.Config, err) - } else { - r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config) - } +func (r *raft) appliedTo(index uint64) { + oldApplied := r.raftLog.applied + newApplied := max(index, oldApplied) + r.raftLog.appliedTo(newApplied) + + if r.prs.Config.AutoLeave && newApplied >= r.pendingConfIndex && r.state == StateLeader { + // If the current (and most recent, at least for this leader's term) + // configuration should be auto-left, initiate that now. We use a + // nil Data which unmarshals into an empty ConfChangeV2 and has the + // benefit that appendEntry can never refuse it based on its size + // (which registers as zero). + m, err := confChangeToMsg(nil) + if err != nil { + panic(err) } - } - - if len(rd.Entries) > 0 { - e := rd.Entries[len(rd.Entries)-1] - if r.id == r.lead { - // The leader needs to self-ack the entries just appended (since it doesn't - // send an MsgApp to itself). This is roughly equivalent to: - // - // r.prs.Progress[r.id].MaybeUpdate(e.Index) - // if r.maybeCommit() { - // r.bcastAppend() - // } - _ = r.Step(pb.Message{From: r.id, Type: pb.MsgAppResp, Index: e.Index}) + // NB: this proposal can't be dropped due to size, but can be + // dropped if a leadership transfer is in progress. We'll keep + // checking this condition on each applied entry, so either the + // leadership transfer will succeed and the new leader will leave + // the joint configuration, or the leadership transfer will fail, + // and we will propose the config change on the next advance. + if err := r.Step(m); err != nil { + r.logger.Debugf("not initiating automatic transition out of joint configuration %s: %v", r.prs.Config, err) + } else { + r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config) } - // NB: it's important for performance that this call happens after - // r.Step above on the leader. This is because r.Step can then use - // a fast-path for `r.raftLog.term()`. - r.raftLog.stableTo(e.Index, e.Term) - } - if !IsEmptySnap(rd.Snapshot) { - r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) } - r.maybeVoteForSelf() +} + +func (r *raft) appliedSnap(snap *pb.Snapshot) { + index := snap.Metadata.Index + r.raftLog.stableSnapTo(index) + r.appliedTo(index) } // maybeCommit attempts to advance the commit index. Returns true if @@ -635,22 +721,6 @@ func (r *raft) maybeCommit() bool { return r.raftLog.maybeCommit(mci, r.Term) } -// maybeVoteForSelf attempts to inform a (pre-)candidate node that its -// vote for itself has been made durable and can now be counted towards -// the active election, if one is still ongoing. Returns true if the -// node was informed of a self-vote. -func (r *raft) maybeVoteForSelf() bool { - if r.voteSelfOnAdvance.Type == 0 { - return false - } - voteMsg := r.voteSelfOnAdvance - // NB: Clear the voteSelfOnAdvance marker before calling Step. - // Step may re-set the marker and cause us to loop. - r.voteSelfOnAdvance = pb.Message{} - _ = r.Step(voteMsg) - return true -} - func (r *raft) reset(term uint64) { if r.Term != term { r.Term = term @@ -698,7 +768,18 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { return false } // use latest "last" index after truncate/append - r.raftLog.append(es...) + li = r.raftLog.append(es...) + // The leader needs to self-ack the entries just appended once they have + // been durably persisted (since it doesn't send an MsgApp to itself). This + // response message will be added to msgsAfterAppend and delivered back to + // this node after these entries have been written to stable storage. When + // handled, this is roughly equivalent to: + // + // r.prs.Progress[r.id].MaybeUpdate(e.Index) + // if r.maybeCommit() { + // r.bcastAppend() + // } + r.send(pb.Message{To: r.id, Type: pb.MsgAppResp, Index: li}) return true } @@ -818,7 +899,7 @@ func (r *raft) becomeLeader() { // uncommitted log quota. This is because we want to preserve the // behavior of allowing one entry larger than quota if the current // usage is zero. - r.reduceUncommittedSize([]pb.Entry{emptyEnt}) + r.uncommittedSize = 0 r.logger.Infof("%x became leader at term %d", r.id, r.Term) } @@ -876,7 +957,12 @@ func (r *raft) campaign(t CampaignType) { } for _, id := range ids { if id == r.id { - r.voteSelfOnAdvance = pb.Message{To: id, From: id, Term: term, Type: voteRespMsgType(voteMsg)} + // The candidate votes for itself and should account for this self + // vote once the vote has been durably persisted (since it doesn't + // send a MsgVote to itself). This response message will be added to + // msgsAfterAppend and delivered back to this node after the vote + // has been written to stable storage. + r.send(pb.Message{To: id, Term: term, Type: voteRespMsgType(voteMsg)}) continue } r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d", @@ -967,6 +1053,21 @@ func (r *raft) Step(m pb.Message) error { r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true}) + } else if m.Type == pb.MsgStorageAppendResp { + if m.Index != 0 { + // Don't consider the appended log entries to be stable because + // they may have been overwritten in the unstable log during a + // later term. See the comment in newStorageAppendResp for more + // about this race. + r.logger.Infof("%x [term: %d] ignored entry appends from a %s message with lower term [term: %d]", + r.id, r.Term, m.Type, m.Term) + } + if m.Snapshot != nil { + // Even if the snapshot applied under a different term, its + // application is still valid. Snapshots carry committed + // (term-independent) state. + r.appliedSnap(m.Snapshot) + } } else { // ignore other cases r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]", @@ -983,6 +1084,20 @@ func (r *raft) Step(m pb.Message) error { r.hup(campaignElection) } + case pb.MsgStorageAppendResp: + if m.Index != 0 { + r.raftLog.stableTo(m.Index, m.LogTerm) + } + if m.Snapshot != nil { + r.appliedSnap(m.Snapshot) + } + + case pb.MsgStorageApplyResp: + r.appliedTo(m.Index) + // NOTE: we abuse the LogTerm field to store the aggregate entry size so + // that we don't need to introduce a new field on Message. + r.reduceUncommittedSize(m.LogTerm) + case pb.MsgVote, pb.MsgPreVote: // We can vote if this is a repeat of a vote we've already cast... canVote := r.Vote == m.From || @@ -1835,14 +1950,19 @@ func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { return true } -// reduceUncommittedSize accounts for the newly committed entries by decreasing -// the uncommitted entry size limit. -func (r *raft) reduceUncommittedSize(ents []pb.Entry) { +// getUncommittedSize computes the aggregate size of the provided entries. +func (r *raft) getUncommittedSize(ents []pb.Entry) uint64 { if r.uncommittedSize == 0 { // Fast-path for followers, who do not track or enforce the limit. - return + return 0 } - if s := payloadsSize(ents); s > r.uncommittedSize { + return payloadsSize(ents) +} + +// reduceUncommittedSize accounts for the newly committed entries by decreasing +// the uncommitted entry size limit. +func (r *raft) reduceUncommittedSize(s uint64) { + if s > r.uncommittedSize { // uncommittedSize may underestimate the size of the uncommitted Raft // log tail but will never overestimate it. Saturate at 0 instead of // allowing overflow. diff --git a/raft_paper_test.go b/raft_paper_test.go index 72942c08..585168d0 100644 --- a/raft_paper_test.go +++ b/raft_paper_test.go @@ -162,7 +162,7 @@ func testNonleaderStartElection(t *testing.T, state StateType) { for i := 1; i < 2*et; i++ { r.tick() } - r.maybeVoteForSelf() + r.advanceMessagesAfterAppend() if r.Term != 2 { t.Errorf("term = %d, want 2", r.Term) @@ -219,7 +219,7 @@ func TestLeaderElectionInOneRoundRPC(t *testing.T) { r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(idsBySize(tt.size)...))) r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - r.maybeVoteForSelf() + r.advanceMessagesAfterAppend() for id, vote := range tt.votes { r.Step(pb.Message{From: id, To: 1, Term: r.Term, Type: pb.MsgVoteResp, Reject: !vote}) } @@ -255,7 +255,7 @@ func TestFollowerVote(t *testing.T) { r.Step(pb.Message{From: tt.nvote, To: 1, Term: 1, Type: pb.MsgVote}) - msgs := r.readMessages() + msgs := r.msgsAfterAppend wmsgs := []pb.Message{ {From: 1, To: tt.nvote, Term: 1, Type: pb.MsgVoteResp, Reject: tt.wreject}, } @@ -497,11 +497,8 @@ func TestLeaderAcknowledgeCommit(t *testing.T) { commitNoopEntry(r, s) li := r.raftLog.lastIndex() r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) - - rd := newReady(r, &SoftState{}, pb.HardState{}) - s.Append(rd.Entries) - r.advance(rd) // simulate having appended entry on leader - for _, m := range rd.Messages { + r.advanceMessagesAfterAppend() + for _, m := range r.msgs { if tt.nonLeaderAcceptors[m.To] { r.Step(acceptAndReply(m)) } @@ -896,9 +893,7 @@ func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Term: r.Term, Index: tt.index}) - rd := newReady(r, &SoftState{}, pb.HardState{}) - storage.Append(rd.Entries) - r.advance(rd) + r.advanceMessagesAfterAppend() if r.raftLog.committed != tt.wcommit { t.Errorf("#%d: commit = %d, want %d", i, r.raftLog.committed, tt.wcommit) } diff --git a/raft_test.go b/raft_test.go index 030e59b5..f5979168 100644 --- a/raft_test.go +++ b/raft_test.go @@ -27,17 +27,19 @@ import ( "go.etcd.io/raft/v3/tracker" ) -// nextEnts returns the appliable entries and updates the applied index +// nextEnts returns the appliable entries and updates the applied index. func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) { - for { - rd := newReady(r, &SoftState{}, pb.HardState{}) - s.Append(rd.Entries) - r.advance(rd) - if len(rd.Entries)+len(rd.CommittedEntries) == 0 { - return ents - } - ents = append(ents, rd.CommittedEntries...) - } + // Append unstable entries. + s.Append(r.raftLog.nextUnstableEnts()) + r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm()) + + // Run post-append steps. + r.advanceMessagesAfterAppend() + + // Return committed entries. + ents = r.raftLog.nextCommittedEnts(true) + r.raftLog.appliedTo(r.raftLog.committed) + return ents } func mustAppendEntry(r *raft, ents ...pb.Entry) { @@ -49,16 +51,45 @@ func mustAppendEntry(r *raft, ents ...pb.Entry) { type stateMachine interface { Step(m pb.Message) error readMessages() []pb.Message - maybeVoteForSelf() bool + advanceMessagesAfterAppend() } func (r *raft) readMessages() []pb.Message { + r.advanceMessagesAfterAppend() msgs := r.msgs - r.msgs = make([]pb.Message, 0) + r.msgs = nil + return msgs +} + +func (r *raft) advanceMessagesAfterAppend() { + for { + msgs := r.takeMessagesAfterAppend() + if len(msgs) == 0 { + break + } + r.stepOrSend(msgs) + } +} +func (r *raft) takeMessagesAfterAppend() []pb.Message { + msgs := r.msgsAfterAppend + r.msgsAfterAppend = nil return msgs } +func (r *raft) stepOrSend(msgs []pb.Message) error { + for _, m := range msgs { + if m.To == r.id { + if err := r.Step(m); err != nil { + return err + } + } else { + r.msgs = append(r.msgs, m) + } + } + return nil +} + func TestProgressLeader(t *testing.T) { s := newTestMemoryStorage(withPeers(1, 2)) r := newTestRaft(1, 5, 1, s) @@ -76,11 +107,11 @@ func TestProgressLeader(t *testing.T) { if m := r.prs.Progress[1].Match; m != 0 { t.Fatalf("expected zero match, got %d", m) } - rd := newReady(r, &SoftState{}, pb.HardState{}) - if len(rd.Entries) != 6 || len(rd.Entries[0].Data) > 0 || string(rd.Entries[5].Data) != "foo" { - t.Fatalf("unexpected Entries: %s", DescribeReady(rd, nil)) + ents := r.raftLog.nextUnstableEnts() + if len(ents) != 6 || len(ents[0].Data) > 0 || string(ents[5].Data) != "foo" { + t.Fatalf("unexpected entries: %v", ents) } - r.advance(rd) + r.advanceMessagesAfterAppend() if m := r.prs.Progress[1].Match; m != 6 { t.Fatalf("unexpected Match %d", m) } @@ -252,7 +283,7 @@ func TestUncommittedEntryLimit(t *testing.T) { if e := maxEntries * numFollowers; len(ms) != e { t.Fatalf("expected %d messages, got %d", e, len(ms)) } - r.reduceUncommittedSize(propEnts) + r.reduceUncommittedSize(r.getUncommittedSize(propEnts)) if r.uncommittedSize != 0 { t.Fatalf("committed everything, but still tracking %d", r.uncommittedSize) } @@ -288,7 +319,7 @@ func TestUncommittedEntryLimit(t *testing.T) { if e := 2 * numFollowers; len(ms) != e { t.Fatalf("expected %d messages, got %d", e, len(ms)) } - r.reduceUncommittedSize(propEnts) + r.reduceUncommittedSize(r.getUncommittedSize(propEnts)) if n := r.uncommittedSize; n != 0 { t.Fatalf("expected zero uncommitted size, got %d", n) } @@ -383,8 +414,8 @@ func TestLearnerPromotion(t *testing.T) { setRandomizedElectionTimeout(n1, n1.electionTimeout) for i := 0; i < n1.electionTimeout; i++ { n1.tick() - n1.maybeVoteForSelf() } + n1.advanceMessagesAfterAppend() if n1.state != StateLeader { t.Errorf("peer 1 state: %s, want %s", n1.state, StateLeader) @@ -405,8 +436,8 @@ func TestLearnerPromotion(t *testing.T) { setRandomizedElectionTimeout(n2, n2.electionTimeout) for i := 0; i < n2.electionTimeout; i++ { n2.tick() - n2.maybeVoteForSelf() } + n2.advanceMessagesAfterAppend() nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat}) @@ -427,10 +458,11 @@ func TestLearnerCanVote(t *testing.T) { n2.Step(pb.Message{From: 1, To: 2, Term: 2, Type: pb.MsgVote, LogTerm: 11, Index: 11}) - if len(n2.msgs) != 1 { - t.Fatalf("expected exactly one message, not %+v", n2.msgs) + msgs := n2.readMessages() + if len(msgs) != 1 { + t.Fatalf("expected exactly one message, not %+v", msgs) } - msg := n2.msgs[0] + msg := msgs[0] if msg.Type != pb.MsgVoteResp && !msg.Reject { t.Fatal("expected learner to not reject vote") } @@ -588,10 +620,11 @@ func testVoteFromAnyState(t *testing.T, vt pb.MessageType) { if err := r.Step(msg); err != nil { t.Errorf("%s,%s: Step failed: %s", vt, st, err) } - if len(r.msgs) != 1 { - t.Errorf("%s,%s: %d response messages, want 1: %+v", vt, st, len(r.msgs), r.msgs) + msgs := r.readMessages() + if len(msgs) != 1 { + t.Errorf("%s,%s: %d response messages, want 1: %+v", vt, st, len(msgs), msgs) } else { - resp := r.msgs[0] + resp := msgs[0] if resp.Type != voteRespMsgType(vt) { t.Errorf("%s,%s: response message is %s, want %s", vt, st, resp.Type, voteRespMsgType(vt)) @@ -703,8 +736,8 @@ func TestLearnerLogReplication(t *testing.T) { setRandomizedElectionTimeout(n1, n1.electionTimeout) for i := 0; i < n1.electionTimeout; i++ { n1.tick() - n1.maybeVoteForSelf() } + n1.advanceMessagesAfterAppend() nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) @@ -719,20 +752,11 @@ func TestLearnerLogReplication(t *testing.T) { nextCommitted := uint64(2) { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - rd := newReady(n1, &SoftState{}, pb.HardState{}) - nt.send(rd.Messages...) - s1.Append(rd.Entries) - n1.advance(rd) } if n1.raftLog.committed != nextCommitted { t.Errorf("peer 1 wants committed to %d, but still %d", nextCommitted, n1.raftLog.committed) } - { - rd := newReady(n1, &SoftState{}, pb.HardState{}) - nt.send(rd.Messages...) - } - if n1.raftLog.committed != n2.raftLog.committed { t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed) } @@ -752,10 +776,6 @@ func TestSingleNodeCommit(t *testing.T) { tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) - rd := newReady(r, &SoftState{}, pb.HardState{}) - s.Append(rd.Entries) - r.advance(rd) - sm := tt.peers[1].(*raft) if sm.raftLog.committed != 3 { t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3) @@ -1755,8 +1775,8 @@ func testCandidateResetTerm(t *testing.T, mt pb.MessageType) { c.resetRandomizedElectionTimeout() for i := 0; i < c.randomizedElectionTimeout; i++ { c.tick() - c.maybeVoteForSelf() } + c.advanceMessagesAfterAppend() if c.state != StateCandidate { t.Errorf("state = %s, want %s", c.state, StateCandidate) @@ -1796,7 +1816,7 @@ func testCandidateSelfVoteAfterLostElection(t *testing.T, preVote bool) { // n1 calls an election. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - voteMsg := sm.voteSelfOnAdvance + steps := sm.takeMessagesAfterAppend() // n1 hears that n2 already won the election before it has had a // change to sync its vote to disk and account for its self-vote. @@ -1807,7 +1827,7 @@ func testCandidateSelfVoteAfterLostElection(t *testing.T, preVote bool) { } // n1 remains a follower even after its self-vote is delivered. - sm.Step(voteMsg) + sm.stepOrSend(steps) if sm.state != StateFollower { t.Errorf("state = %v, want %v", sm.state, StateFollower) } @@ -1825,10 +1845,10 @@ func TestCandidateDeliversPreCandidateSelfVoteAfterBecomingCandidate(t *testing. // n1 calls an election. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - preVoteMsg := sm.voteSelfOnAdvance if sm.state != StatePreCandidate { t.Errorf("state = %v, want %v", sm.state, StatePreCandidate) } + steps := sm.takeMessagesAfterAppend() // n1 receives pre-candidate votes from both other peers before // voting for itself. n1 becomes a candidate. @@ -1841,11 +1861,11 @@ func TestCandidateDeliversPreCandidateSelfVoteAfterBecomingCandidate(t *testing. // n1 remains a candidate even after its delayed pre-vote self-vote is // delivered. - sm.Step(preVoteMsg) - voteMsg := sm.voteSelfOnAdvance + sm.stepOrSend(steps) if sm.state != StateCandidate { t.Errorf("state = %v, want %v", sm.state, StateCandidate) } + steps = sm.takeMessagesAfterAppend() // Its pre-vote self-vote does not make its way to its ProgressTracker. granted, _, _ := sm.prs.TallyVotes() @@ -1861,12 +1881,35 @@ func TestCandidateDeliversPreCandidateSelfVoteAfterBecomingCandidate(t *testing. // n1 becomes the leader once its self-vote is received because now // quorum is reached. - sm.Step(voteMsg) + sm.stepOrSend(steps) if sm.state != StateLeader { t.Errorf("state = %v, want %v", sm.state, StateLeader) } } +func TestLeaderMsgAppSelfAckAfterTermChange(t *testing.T) { + sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3))) + sm.becomeCandidate() + sm.becomeLeader() + + // n1 proposes a write. + sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) + steps := sm.takeMessagesAfterAppend() + + // n1 hears that n2 is the new leader. + sm.Step(pb.Message{From: 2, To: 1, Term: sm.Term + 1, Type: pb.MsgHeartbeat}) + if sm.state != StateFollower { + t.Errorf("state = %v, want %v", sm.state, StateFollower) + } + + // n1 advances, ignoring its earlier self-ack of its MsgApp. The + // corresponding MsgAppResp is ignored because it carries an earlier term. + sm.stepOrSend(steps) + if sm.state != StateFollower { + t.Errorf("state = %v, want %v", sm.state, StateFollower) + } +} + func TestLeaderStepdownWhenQuorumActive(t *testing.T) { sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3))) @@ -2625,6 +2668,8 @@ func TestBcastBeat(t *testing.T) { for i := 0; i < 10; i++ { mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1}) } + sm.advanceMessagesAfterAppend() + // slow follower sm.prs.Progress[2].Match, sm.prs.Progress[2].Next = 5, 6 // normal follower @@ -3052,12 +3097,9 @@ func TestLearnerReceiveSnapshot(t *testing.T) { n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) n1.restore(s) - ready := newReady(n1, &SoftState{}, pb.HardState{}) - store.ApplySnapshot(ready.Snapshot) - n1.advance(ready) - - // Force set n1 appplied index. - n1.raftLog.appliedTo(n1.raftLog.committed) + snap := n1.raftLog.nextUnstableSnapshot() + store.ApplySnapshot(*snap) + n1.appliedSnap(snap) nt := newNetwork(n1, n2) @@ -3476,15 +3518,13 @@ func testCampaignWhileLeader(t *testing.T, preVote bool) { // We don't call campaign() directly because it comes after the check // for our current state. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - for r.maybeVoteForSelf() { - } + r.advanceMessagesAfterAppend() if r.state != StateLeader { t.Errorf("expected single-node election to become leader but got %s", r.state) } term := r.Term r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - for r.maybeVoteForSelf() { - } + r.advanceMessagesAfterAppend() if r.state != StateLeader { t.Errorf("expected to remain leader but got %s", r.state) } @@ -3705,9 +3745,9 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) { // Apply snapshot and resume progress follower := nt.peers[3].(*raft) - ready := newReady(follower, &SoftState{}, pb.HardState{}) - nt.storage[3].ApplySnapshot(ready.Snapshot) - follower.advance(ready) + snap := follower.raftLog.nextUnstableSnapshot() + nt.storage[3].ApplySnapshot(*snap) + follower.appliedSnap(snap) nt.msgHook = nil nt.send(filtered) @@ -4851,9 +4891,8 @@ func (nw *network) send(msgs ...pb.Message) { if nw.t != nil { nw.t.Log(DescribeMessage(m, nil)) } - p.Step(m) - for p.maybeVoteForSelf() { - } + _ = p.Step(m) + p.advanceMessagesAfterAppend() msgs = append(msgs[1:], nw.filter(p.readMessages())...) } } @@ -4918,9 +4957,9 @@ type connem struct { type blackHole struct{} -func (blackHole) Step(pb.Message) error { return nil } -func (blackHole) readMessages() []pb.Message { return nil } -func (blackHole) maybeVoteForSelf() bool { return false } +func (blackHole) Step(pb.Message) error { return nil } +func (blackHole) readMessages() []pb.Message { return nil } +func (blackHole) advanceMessagesAfterAppend() {} var nopStepper = &blackHole{} diff --git a/rafttest/interaction_env.go b/rafttest/interaction_env.go index 3c245f07..a7dfc0cf 100644 --- a/rafttest/interaction_env.go +++ b/rafttest/interaction_env.go @@ -34,8 +34,10 @@ type Node struct { *raft.RawNode Storage - Config *raft.Config - History []pb.Snapshot + Config *raft.Config + AppendWork []pb.Message // []MsgStorageAppend + ApplyWork []pb.Message // []MsgStorageApply + History []pb.Snapshot } // InteractionEnv facilitates testing of complex interactions between the diff --git a/rafttest/interaction_env_handler.go b/rafttest/interaction_env_handler.go index 73e706fe..9f95bc12 100644 --- a/rafttest/interaction_env_handler.go +++ b/rafttest/interaction_env_handler.go @@ -43,7 +43,7 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string { case "add-nodes": // Example: // - // add-nodes voters=(1 2 3) learners=(4 5) index=2 content=foo + // add-nodes voters=(1 2 3) learners=(4 5) index=2 content=foo async-storage-writes=true err = env.handleAddNodes(t, d) case "campaign": // Example: @@ -67,6 +67,16 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string { // // process-ready 3 err = env.handleProcessReady(t, d) + case "process-append-thread": + // Example: + // + // process-append-thread 3 + err = env.handleProcessAppendThread(t, d) + case "process-apply-thread": + // Example: + // + // process-apply-thread 3 + err = env.handleProcessApplyThread(t, d) case "log-level": // Set the log level. NONE disables all output, including from the test // harness (except errors). diff --git a/rafttest/interaction_env_handler_add_nodes.go b/rafttest/interaction_env_handler_add_nodes.go index f164a6d2..f086aed6 100644 --- a/rafttest/interaction_env_handler_add_nodes.go +++ b/rafttest/interaction_env_handler_add_nodes.go @@ -48,6 +48,8 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e cfg.Applied = snap.Metadata.Index case "content": arg.Scan(t, i, &snap.Data) + case "async-storage-writes": + arg.Scan(t, i, &cfg.AsyncStorageWrites) } } } diff --git a/rafttest/interaction_env_handler_process_append_thread.go b/rafttest/interaction_env_handler_process_append_thread.go new file mode 100644 index 00000000..3f74988a --- /dev/null +++ b/rafttest/interaction_env_handler_process_append_thread.go @@ -0,0 +1,97 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "testing" + + "github.com/cockroachdb/datadriven" + + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" +) + +func (env *InteractionEnv) handleProcessAppendThread(t *testing.T, d datadriven.TestData) error { + idxs := nodeIdxs(t, d) + for _, idx := range idxs { + var err error + if len(idxs) > 1 { + fmt.Fprintf(env.Output, "> %d processing append thread\n", idx+1) + env.withIndent(func() { err = env.ProcessAppendThread(idx) }) + } else { + err = env.ProcessAppendThread(idx) + } + if err != nil { + return err + } + } + return nil +} + +// ProcessAppendThread runs processes a single message on the "append" thread of +// the node with the given index. +func (env *InteractionEnv) ProcessAppendThread(idx int) error { + n := &env.Nodes[idx] + if len(n.AppendWork) == 0 { + env.Output.WriteString("no append work to perform") + return nil + } + m := n.AppendWork[0] + n.AppendWork = n.AppendWork[1:] + + resps := m.Responses + m.Responses = nil + env.Output.WriteString("Processing:\n") + env.Output.WriteString(raft.DescribeMessage(m, defaultEntryFormatter) + "\n") + var st raftpb.HardState + if m.HardState != nil { + st = *m.HardState + } + var snap raftpb.Snapshot + if m.Snapshot != nil { + snap = *m.Snapshot + } + if err := processAppend(n, st, m.Entries, snap); err != nil { + return err + } + + env.Output.WriteString("Responses:\n") + for _, m := range resps { + env.Output.WriteString(raft.DescribeMessage(m, defaultEntryFormatter) + "\n") + } + env.Messages = append(env.Messages, resps...) + return nil +} + +func processAppend(n *Node, st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error { + // TODO(tbg): the order of operations here is not necessarily safe. See: + // https://github.com/etcd-io/etcd/pull/10861 + s := n.Storage + if !raft.IsEmptyHardState(st) { + if err := s.SetHardState(st); err != nil { + return err + } + } + if err := s.Append(ents); err != nil { + return err + } + if !raft.IsEmptySnap(snap) { + if err := s.ApplySnapshot(snap); err != nil { + return err + } + } + return nil +} diff --git a/rafttest/interaction_env_handler_process_apply_thread.go b/rafttest/interaction_env_handler_process_apply_thread.go new file mode 100644 index 00000000..d21317e0 --- /dev/null +++ b/rafttest/interaction_env_handler_process_apply_thread.go @@ -0,0 +1,111 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "testing" + + "github.com/cockroachdb/datadriven" + + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" +) + +func (env *InteractionEnv) handleProcessApplyThread(t *testing.T, d datadriven.TestData) error { + idxs := nodeIdxs(t, d) + for _, idx := range idxs { + var err error + if len(idxs) > 1 { + fmt.Fprintf(env.Output, "> %d processing apply thread\n", idx+1) + env.withIndent(func() { err = env.ProcessApplyThread(idx) }) + } else { + err = env.ProcessApplyThread(idx) + } + if err != nil { + return err + } + } + return nil +} + +// ProcessApplyThread runs processes a single message on the "apply" thread of +// the node with the given index. +func (env *InteractionEnv) ProcessApplyThread(idx int) error { + n := &env.Nodes[idx] + if len(n.ApplyWork) == 0 { + env.Output.WriteString("no apply work to perform") + return nil + } + m := n.ApplyWork[0] + n.ApplyWork = n.ApplyWork[1:] + + resps := m.Responses + m.Responses = nil + env.Output.WriteString("Processing:\n") + env.Output.WriteString(raft.DescribeMessage(m, defaultEntryFormatter) + "\n") + if err := processApply(n, m.Entries); err != nil { + return err + } + + env.Output.WriteString("Responses:\n") + for _, m := range resps { + env.Output.WriteString(raft.DescribeMessage(m, defaultEntryFormatter) + "\n") + } + env.Messages = append(env.Messages, resps...) + return nil +} + +func processApply(n *Node, ents []raftpb.Entry) error { + for _, ent := range ents { + var update []byte + var cs *raftpb.ConfState + switch ent.Type { + case raftpb.EntryConfChange: + var cc raftpb.ConfChange + if err := cc.Unmarshal(ent.Data); err != nil { + return err + } + update = cc.Context + cs = n.RawNode.ApplyConfChange(cc) + case raftpb.EntryConfChangeV2: + var cc raftpb.ConfChangeV2 + if err := cc.Unmarshal(ent.Data); err != nil { + return err + } + cs = n.RawNode.ApplyConfChange(cc) + update = cc.Context + default: + update = ent.Data + } + + // Record the new state by starting with the current state and applying + // the command. + lastSnap := n.History[len(n.History)-1] + var snap raftpb.Snapshot + snap.Data = append(snap.Data, lastSnap.Data...) + // NB: this hard-codes an "appender" state machine. + snap.Data = append(snap.Data, update...) + snap.Metadata.Index = ent.Index + snap.Metadata.Term = ent.Term + if cs == nil { + sl := n.History + cs = &sl[len(sl)-1].Metadata.ConfState + } + snap.Metadata.ConfState = *cs + n.History = append(n.History, snap) + } + return nil +} diff --git a/rafttest/interaction_env_handler_process_ready.go b/rafttest/interaction_env_handler_process_ready.go index 6b2ef18e..e72d3d9d 100644 --- a/rafttest/interaction_env_handler_process_ready.go +++ b/rafttest/interaction_env_handler_process_ready.go @@ -44,65 +44,39 @@ func (env *InteractionEnv) handleProcessReady(t *testing.T, d datadriven.TestDat // ProcessReady runs Ready handling on the node with the given index. func (env *InteractionEnv) ProcessReady(idx int) error { // TODO(tbg): Allow simulating crashes here. - rn, s := env.Nodes[idx].RawNode, env.Nodes[idx].Storage - rd := rn.Ready() + n := &env.Nodes[idx] + rd := n.Ready() env.Output.WriteString(raft.DescribeReady(rd, defaultEntryFormatter)) - // TODO(tbg): the order of operations here is not necessarily safe. See: - // https://github.com/etcd-io/etcd/pull/10861 - if !raft.IsEmptyHardState(rd.HardState) { - if err := s.SetHardState(rd.HardState); err != nil { + + if !n.Config.AsyncStorageWrites { + if err := processAppend(n, rd.HardState, rd.Entries, rd.Snapshot); err != nil { return err } - } - if err := s.Append(rd.Entries); err != nil { - return err - } - if !raft.IsEmptySnap(rd.Snapshot) { - if err := s.ApplySnapshot(rd.Snapshot); err != nil { + if err := processApply(n, rd.CommittedEntries); err != nil { return err } } - for _, ent := range rd.CommittedEntries { - var update []byte - var cs *raftpb.ConfState - switch ent.Type { - case raftpb.EntryConfChange: - var cc raftpb.ConfChange - if err := cc.Unmarshal(ent.Data); err != nil { - return err + + for _, m := range rd.Messages { + if raft.IsLocalMsgTarget(m.To) { + if !n.Config.AsyncStorageWrites { + panic("unexpected local msg target") } - update = cc.Context - cs = rn.ApplyConfChange(cc) - case raftpb.EntryConfChangeV2: - var cc raftpb.ConfChangeV2 - if err := cc.Unmarshal(ent.Data); err != nil { - return err + switch m.Type { + case raftpb.MsgStorageAppend: + n.AppendWork = append(n.AppendWork, m) + case raftpb.MsgStorageApply: + n.ApplyWork = append(n.ApplyWork, m) + default: + panic(fmt.Sprintf("unexpected message type %s", m.Type)) } - cs = rn.ApplyConfChange(cc) - update = cc.Context - default: - update = ent.Data - } - - // Record the new state by starting with the current state and applying - // the command. - lastSnap := env.Nodes[idx].History[len(env.Nodes[idx].History)-1] - var snap raftpb.Snapshot - snap.Data = append(snap.Data, lastSnap.Data...) - // NB: this hard-codes an "appender" state machine. - snap.Data = append(snap.Data, update...) - snap.Metadata.Index = ent.Index - snap.Metadata.Term = ent.Term - if cs == nil { - sl := env.Nodes[idx].History - cs = &sl[len(sl)-1].Metadata.ConfState + } else { + env.Messages = append(env.Messages, m) } - snap.Metadata.ConfState = *cs - env.Nodes[idx].History = append(env.Nodes[idx].History, snap) } - env.Messages = append(env.Messages, rd.Messages...) - - rn.Advance(rd) + if !n.Config.AsyncStorageWrites { + n.Advance(rd) + } return nil } diff --git a/rafttest/interaction_env_handler_stabilize.go b/rafttest/interaction_env_handler_stabilize.go index c4579be3..8fa6a8e8 100644 --- a/rafttest/interaction_env_handler_stabilize.go +++ b/rafttest/interaction_env_handler_stabilize.go @@ -31,22 +31,29 @@ func (env *InteractionEnv) handleStabilize(t *testing.T, d datadriven.TestData) // Stabilize repeatedly runs Ready handling on and message delivery to the set // of nodes specified via the idxs slice until reaching a fixed point. func (env *InteractionEnv) Stabilize(idxs ...int) error { - var nodes []Node - for _, idx := range idxs { - nodes = append(nodes, env.Nodes[idx]) - } - if len(nodes) == 0 { - nodes = env.Nodes + var nodes []*Node + if len(idxs) != 0 { + for _, idx := range idxs { + nodes = append(nodes, &env.Nodes[idx]) + } + } else { + for i := range env.Nodes { + nodes = append(nodes, &env.Nodes[i]) + } } for { done := true for _, rn := range nodes { if rn.HasReady() { - done = false idx := int(rn.Status().ID - 1) fmt.Fprintf(env.Output, "> %d handling Ready\n", idx+1) - env.withIndent(func() { env.ProcessReady(idx) }) + var err error + env.withIndent(func() { err = env.ProcessReady(idx) }) + if err != nil { + return err + } + done = false } } for _, rn := range nodes { @@ -59,6 +66,30 @@ func (env *InteractionEnv) Stabilize(idxs ...int) error { done = false } } + for _, rn := range nodes { + idx := int(rn.Status().ID - 1) + if len(rn.AppendWork) > 0 { + fmt.Fprintf(env.Output, "> %d processing append thread\n", idx+1) + for len(rn.AppendWork) > 0 { + var err error + env.withIndent(func() { err = env.ProcessAppendThread(idx) }) + if err != nil { + return err + } + } + done = false + } + } + for _, rn := range nodes { + idx := int(rn.Status().ID - 1) + if len(rn.ApplyWork) > 0 { + fmt.Fprintf(env.Output, "> %d processing apply thread\n", idx+1) + for len(rn.ApplyWork) > 0 { + env.withIndent(func() { env.ProcessApplyThread(idx) }) + } + done = false + } + } if done { return nil } diff --git a/rawnode.go b/rawnode.go index 82f2b0bf..9f463156 100644 --- a/rawnode.go +++ b/rawnode.go @@ -32,9 +32,10 @@ var ErrStepPeerNotFound = errors.New("raft: cannot step as peer not found") // The methods of this struct correspond to the methods of Node and are described // more fully there. type RawNode struct { - raft *raft - prevSoftSt *SoftState - prevHardSt pb.HardState + raft *raft + prevSoftSt *SoftState + prevHardSt pb.HardState + stepsOnAdvance []pb.Message } // NewRawNode instantiates a RawNode from the given configuration. @@ -108,14 +109,14 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState { // Step advances the state machine using the given message. func (rn *RawNode) Step(m pb.Message) error { - // ignore unexpected local messages receiving over network - if IsLocalMsg(m.Type) { + // Ignore unexpected local messages receiving over network. + if IsLocalMsg(m.Type) && !IsLocalMsgTarget(m.From) { return ErrStepLocalMsg } - if pr := rn.raft.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) { - return rn.raft.Step(m) + if IsResponseMsg(m.Type) && !IsLocalMsgTarget(m.From) && rn.raft.prs.Progress[m.From] == nil { + return ErrStepPeerNotFound } - return ErrStepPeerNotFound + return rn.raft.Step(m) } // Ready returns the outstanding work that the application needs to handle. This @@ -147,10 +148,26 @@ func (rn *RawNode) acceptReady(rd Ready) { if len(rd.ReadStates) != 0 { rn.raft.readStates = nil } + if !rn.raft.asyncStorageWrites { + if len(rn.stepsOnAdvance) != 0 { + rn.raft.logger.Panicf("two accepted Ready structs without call to Advance") + } + for _, m := range rn.raft.msgsAfterAppend { + if m.To == rn.raft.id { + rn.stepsOnAdvance = append(rn.stepsOnAdvance, m) + } + } + if needStorageAppend(rd, false /* haveMsgsAfterAppend */) { + m := newStorageAppendRespMsg(rn.raft, rd) + rn.stepsOnAdvance = append(rn.stepsOnAdvance, m) + } + if needStorageApply(rd) { + m := newStorageApplyRespMsg(rn.raft, rd.CommittedEntries) + rn.stepsOnAdvance = append(rn.stepsOnAdvance, m) + } + } rn.raft.msgs = nil - // NB: this does not do anything yet, as entries and snapshots are always - // stabilized on the next Advance and committed entries are always applied - // by the next Advance. + rn.raft.msgsAfterAppend = nil rn.raft.raftLog.acceptUnstable() if len(rd.CommittedEntries) > 0 { ents := rd.CommittedEntries @@ -170,7 +187,10 @@ func (rn *RawNode) HasReady() bool { if r.raftLog.hasNextUnstableSnapshot() { return true } - if len(r.msgs) > 0 || r.raftLog.hasNextUnstableEnts() || r.raftLog.hasNextCommittedEnts(true /* allowUnstable */) { + if len(r.msgs) > 0 || len(r.msgsAfterAppend) > 0 { + return true + } + if r.raftLog.hasNextUnstableEnts() || r.raftLog.hasNextCommittedEnts(!rn.raft.asyncStorageWrites) { return true } if len(r.readStates) != 0 { @@ -181,8 +201,21 @@ func (rn *RawNode) HasReady() bool { // Advance notifies the RawNode that the application has applied and saved progress in the // last Ready results. -func (rn *RawNode) Advance(rd Ready) { - rn.raft.advance(rd) +// +// NOTE: Advance must not be called when using AsyncStorageWrites. Response messages from +// the local append and apply threads take its place. +func (rn *RawNode) Advance(_ Ready) { + // The actions performed by this function are encoded into stepsOnAdvance in + // acceptReady. In earlier versions of this library, they were computed from + // the provided Ready struct. Retain the unused parameter for compatability. + if rn.raft.asyncStorageWrites { + rn.raft.logger.Panicf("Advance must not be called when using AsyncStorageWrites") + } + for i, m := range rn.stepsOnAdvance { + _ = rn.raft.Step(m) + rn.stepsOnAdvance[i] = pb.Message{} + } + rn.stepsOnAdvance = rn.stepsOnAdvance[:0] } // Status returns the current status of the given group. This allocates, see diff --git a/rawnode_test.go b/rawnode_test.go index d9e8551b..ceaed813 100644 --- a/rawnode_test.go +++ b/rawnode_test.go @@ -342,6 +342,7 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { if len(rd.Entries) > 0 { t.Fatal("expected no more entries") } + rawNode.Advance(rd) if tc.exp2 == nil { return } @@ -370,6 +371,7 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { if exp := tc.exp2; !reflect.DeepEqual(exp, cs) { t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs) } + rawNode.Advance(rd) }) } } diff --git a/testdata/async_storage_writes.txt b/testdata/async_storage_writes.txt new file mode 100644 index 00000000..3831edcb --- /dev/null +++ b/testdata/async_storage_writes.txt @@ -0,0 +1,785 @@ +# Build a pipeline of in-progress entries to append. Let the pipeline grow to a +# depth of 2 before we start allowing work on the append thread to complete. + +add-nodes 3 voters=(1,2,3) index=10 async-storage-writes=true +---- +INFO 1 switched to configuration voters=(1 2 3) +INFO 1 became follower at term 0 +INFO newRaft 1 [peers: [1,2,3], term: 0, commit: 10, applied: 10, lastindex: 10, lastterm: 1] +INFO 2 switched to configuration voters=(1 2 3) +INFO 2 became follower at term 0 +INFO newRaft 2 [peers: [1,2,3], term: 0, commit: 10, applied: 10, lastindex: 10, lastterm: 1] +INFO 3 switched to configuration voters=(1 2 3) +INFO 3 became follower at term 0 +INFO newRaft 3 [peers: [1,2,3], term: 0, commit: 10, applied: 10, lastindex: 10, lastterm: 1] + +campaign 1 +---- +INFO 1 is starting a new election at term 0 +INFO 1 became candidate at term 1 +INFO 1 [logterm: 1, index: 10] sent MsgVote request to 2 at term 1 +INFO 1 [logterm: 1, index: 10] sent MsgVote request to 3 at term 1 + +stabilize +---- +> 1 handling Ready + Ready MustSync=true: + Lead:0 State:StateCandidate + HardState Term:1 Vote:1 Commit:10 + Messages: + 1->2 MsgVote Term:1 Log:1/10 + 1->3 MsgVote Term:1 Log:1/10 + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:10 Responses:[1->1 MsgVoteResp Term:1 Log:0/0, AppendThread->1 MsgStorageAppendResp Term:1 Log:0/0] +> 2 receiving messages + 1->2 MsgVote Term:1 Log:1/10 + INFO 2 [term: 0] received a MsgVote message with higher term from 1 [term: 1] + INFO 2 became follower at term 1 + INFO 2 [logterm: 1, index: 10, vote: 0] cast MsgVote for 1 [logterm: 1, index: 10] at term 1 +> 3 receiving messages + 1->3 MsgVote Term:1 Log:1/10 + INFO 3 [term: 0] received a MsgVote message with higher term from 1 [term: 1] + INFO 3 became follower at term 1 + INFO 3 [logterm: 1, index: 10, vote: 0] cast MsgVote for 1 [logterm: 1, index: 10] at term 1 +> 1 processing append thread + Processing: + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:10 + Responses: + 1->1 MsgVoteResp Term:1 Log:0/0 + AppendThread->1 MsgStorageAppendResp Term:1 Log:0/0 +> 2 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:10 + Messages: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:10 Responses:[2->1 MsgVoteResp Term:1 Log:0/0, AppendThread->2 MsgStorageAppendResp Term:1 Log:0/0] +> 3 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:10 + Messages: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:10 Responses:[3->1 MsgVoteResp Term:1 Log:0/0, AppendThread->3 MsgStorageAppendResp Term:1 Log:0/0] +> 1 receiving messages + 1->1 MsgVoteResp Term:1 Log:0/0 + INFO 1 received MsgVoteResp from 1 at term 1 + INFO 1 has received 1 MsgVoteResp votes and 0 vote rejections + AppendThread->1 MsgStorageAppendResp Term:1 Log:0/0 +> 2 processing append thread + Processing: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:10 + Responses: + 2->1 MsgVoteResp Term:1 Log:0/0 + AppendThread->2 MsgStorageAppendResp Term:1 Log:0/0 +> 3 processing append thread + Processing: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:10 + Responses: + 3->1 MsgVoteResp Term:1 Log:0/0 + AppendThread->3 MsgStorageAppendResp Term:1 Log:0/0 +> 1 receiving messages + 2->1 MsgVoteResp Term:1 Log:0/0 + INFO 1 received MsgVoteResp from 2 at term 1 + INFO 1 has received 2 MsgVoteResp votes and 0 vote rejections + INFO 1 became leader at term 1 + 3->1 MsgVoteResp Term:1 Log:0/0 +> 2 receiving messages + AppendThread->2 MsgStorageAppendResp Term:1 Log:0/0 +> 3 receiving messages + AppendThread->3 MsgStorageAppendResp Term:1 Log:0/0 +> 1 handling Ready + Ready MustSync=true: + Lead:1 State:StateLeader + Entries: + 1/11 EntryNormal "" + Messages: + 1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] + 1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/11 EntryNormal ""] Responses:[1->1 MsgAppResp Term:1 Log:0/11, AppendThread->1 MsgStorageAppendResp Term:1 Log:1/11] +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] +> 1 processing append thread + Processing: + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/11 EntryNormal ""] + Responses: + 1->1 MsgAppResp Term:1 Log:0/11 + AppendThread->1 MsgStorageAppendResp Term:1 Log:1/11 +> 2 handling Ready + Ready MustSync=true: + Lead:1 State:StateFollower + Entries: + 1/11 EntryNormal "" + Messages: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/11 EntryNormal ""] Responses:[2->1 MsgAppResp Term:1 Log:0/11, AppendThread->2 MsgStorageAppendResp Term:1 Log:1/11] +> 3 handling Ready + Ready MustSync=true: + Lead:1 State:StateFollower + Entries: + 1/11 EntryNormal "" + Messages: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/11 EntryNormal ""] Responses:[3->1 MsgAppResp Term:1 Log:0/11, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/11] +> 1 receiving messages + 1->1 MsgAppResp Term:1 Log:0/11 + AppendThread->1 MsgStorageAppendResp Term:1 Log:1/11 +> 2 processing append thread + Processing: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/11 EntryNormal ""] + Responses: + 2->1 MsgAppResp Term:1 Log:0/11 + AppendThread->2 MsgStorageAppendResp Term:1 Log:1/11 +> 3 processing append thread + Processing: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/11 EntryNormal ""] + Responses: + 3->1 MsgAppResp Term:1 Log:0/11 + AppendThread->3 MsgStorageAppendResp Term:1 Log:1/11 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/11 + 3->1 MsgAppResp Term:1 Log:0/11 +> 2 receiving messages + AppendThread->2 MsgStorageAppendResp Term:1 Log:1/11 +> 3 receiving messages + AppendThread->3 MsgStorageAppendResp Term:1 Log:1/11 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:11 + CommittedEntries: + 1/11 EntryNormal "" + Messages: + 1->2 MsgApp Term:1 Log:1/11 Commit:11 + 1->3 MsgApp Term:1 Log:1/11 Commit:11 + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:11 Responses:[AppendThread->1 MsgStorageAppendResp Term:1 Log:0/0] + 1->ApplyThread MsgStorageApply Term:0 Log:0/11 Entries:[1/11 EntryNormal ""] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/11] +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/11 Commit:11 +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/11 Commit:11 +> 1 processing append thread + Processing: + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:11 + Responses: + AppendThread->1 MsgStorageAppendResp Term:1 Log:0/0 +> 1 processing apply thread + Processing: + 1->ApplyThread MsgStorageApply Term:0 Log:0/11 Entries:[1/11 EntryNormal ""] + Responses: + ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/11 +> 2 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:11 + CommittedEntries: + 1/11 EntryNormal "" + Messages: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:11 Responses:[2->1 MsgAppResp Term:1 Log:0/11, AppendThread->2 MsgStorageAppendResp Term:1 Log:0/0] + 2->ApplyThread MsgStorageApply Term:0 Log:0/11 Entries:[1/11 EntryNormal ""] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/11] +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:11 + CommittedEntries: + 1/11 EntryNormal "" + Messages: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:11 Responses:[3->1 MsgAppResp Term:1 Log:0/11, AppendThread->3 MsgStorageAppendResp Term:1 Log:0/0] + 3->ApplyThread MsgStorageApply Term:0 Log:0/11 Entries:[1/11 EntryNormal ""] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/11] +> 1 receiving messages + AppendThread->1 MsgStorageAppendResp Term:1 Log:0/0 + ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/11 +> 2 processing append thread + Processing: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:11 + Responses: + 2->1 MsgAppResp Term:1 Log:0/11 + AppendThread->2 MsgStorageAppendResp Term:1 Log:0/0 +> 3 processing append thread + Processing: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:11 + Responses: + 3->1 MsgAppResp Term:1 Log:0/11 + AppendThread->3 MsgStorageAppendResp Term:1 Log:0/0 +> 2 processing apply thread + Processing: + 2->ApplyThread MsgStorageApply Term:0 Log:0/11 Entries:[1/11 EntryNormal ""] + Responses: + ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/11 +> 3 processing apply thread + Processing: + 3->ApplyThread MsgStorageApply Term:0 Log:0/11 Entries:[1/11 EntryNormal ""] + Responses: + ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/11 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/11 + 3->1 MsgAppResp Term:1 Log:0/11 +> 2 receiving messages + AppendThread->2 MsgStorageAppendResp Term:1 Log:0/0 + ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/11 +> 3 receiving messages + AppendThread->3 MsgStorageAppendResp Term:1 Log:0/0 + ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/11 + +propose 1 prop_1 +---- +ok + +process-ready 1 2 3 +---- +> 1 handling Ready + Ready MustSync=true: + Entries: + 1/12 EntryNormal "prop_1" + Messages: + 1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1"] + 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1"] + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses:[1->1 MsgAppResp Term:1 Log:0/12, AppendThread->1 MsgStorageAppendResp Term:1 Log:1/12] +> 2 handling Ready + +> 3 handling Ready + + +deliver-msgs 1 2 3 +---- +1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1"] +1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1"] + +process-ready 1 2 3 +---- +> 1 handling Ready + +> 2 handling Ready + Ready MustSync=true: + Entries: + 1/12 EntryNormal "prop_1" + Messages: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses:[2->1 MsgAppResp Term:1 Log:0/12, AppendThread->2 MsgStorageAppendResp Term:1 Log:1/12] +> 3 handling Ready + Ready MustSync=true: + Entries: + 1/12 EntryNormal "prop_1" + Messages: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses:[3->1 MsgAppResp Term:1 Log:0/12, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/12] + +propose 1 prop_2 +---- +ok + +process-ready 1 2 3 +---- +> 1 handling Ready + Ready MustSync=true: + Entries: + 1/13 EntryNormal "prop_2" + Messages: + 1->2 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_2"] + 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_2"] + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses:[1->1 MsgAppResp Term:1 Log:0/13, AppendThread->1 MsgStorageAppendResp Term:1 Log:1/13] +> 2 handling Ready + +> 3 handling Ready + + +deliver-msgs 1 2 3 +---- +1->2 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_2"] +1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_2"] + +process-ready 1 2 3 +---- +> 1 handling Ready + +> 2 handling Ready + Ready MustSync=true: + Entries: + 1/13 EntryNormal "prop_2" + Messages: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses:[2->1 MsgAppResp Term:1 Log:0/13, AppendThread->2 MsgStorageAppendResp Term:1 Log:1/13] +> 3 handling Ready + Ready MustSync=true: + Entries: + 1/13 EntryNormal "prop_2" + Messages: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses:[3->1 MsgAppResp Term:1 Log:0/13, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/13] + +process-append-thread 1 2 3 +---- +> 1 processing append thread + Processing: + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] + Responses: + 1->1 MsgAppResp Term:1 Log:0/12 + AppendThread->1 MsgStorageAppendResp Term:1 Log:1/12 +> 2 processing append thread + Processing: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] + Responses: + 2->1 MsgAppResp Term:1 Log:0/12 + AppendThread->2 MsgStorageAppendResp Term:1 Log:1/12 +> 3 processing append thread + Processing: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] + Responses: + 3->1 MsgAppResp Term:1 Log:0/12 + AppendThread->3 MsgStorageAppendResp Term:1 Log:1/12 + +deliver-msgs 1 2 3 +---- +1->1 MsgAppResp Term:1 Log:0/12 +AppendThread->1 MsgStorageAppendResp Term:1 Log:1/12 +2->1 MsgAppResp Term:1 Log:0/12 +3->1 MsgAppResp Term:1 Log:0/12 +AppendThread->2 MsgStorageAppendResp Term:1 Log:1/12 +AppendThread->3 MsgStorageAppendResp Term:1 Log:1/12 + +propose 1 prop_3 +---- +ok + +process-ready 1 2 3 +---- +> 1 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:12 + Entries: + 1/14 EntryNormal "prop_3" + CommittedEntries: + 1/12 EntryNormal "prop_1" + Messages: + 1->2 MsgApp Term:1 Log:1/13 Commit:12 + 1->3 MsgApp Term:1 Log:1/13 Commit:12 + 1->2 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] + 1->3 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] HardState: Term:1 Vote:1 Commit:12 Responses:[1->1 MsgAppResp Term:1 Log:0/14, AppendThread->1 MsgStorageAppendResp Term:1 Log:1/14] + 1->ApplyThread MsgStorageApply Term:0 Log:0/12 Entries:[1/12 EntryNormal "prop_1"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/12] +> 2 handling Ready + +> 3 handling Ready + + +deliver-msgs 1 2 3 +---- +1->2 MsgApp Term:1 Log:1/13 Commit:12 +1->2 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] +1->3 MsgApp Term:1 Log:1/13 Commit:12 +1->3 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] + +process-ready 1 2 3 +---- +> 1 handling Ready + +> 2 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:12 + Entries: + 1/14 EntryNormal "prop_3" + CommittedEntries: + 1/12 EntryNormal "prop_1" + Messages: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] HardState: Term:1 Vote:1 Commit:12 Responses:[2->1 MsgAppResp Term:1 Log:0/13, 2->1 MsgAppResp Term:1 Log:0/14, AppendThread->2 MsgStorageAppendResp Term:1 Log:1/14] + 2->ApplyThread MsgStorageApply Term:0 Log:0/12 Entries:[1/12 EntryNormal "prop_1"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/12] +> 3 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:12 + Entries: + 1/14 EntryNormal "prop_3" + CommittedEntries: + 1/12 EntryNormal "prop_1" + Messages: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] HardState: Term:1 Vote:1 Commit:12 Responses:[3->1 MsgAppResp Term:1 Log:0/13, 3->1 MsgAppResp Term:1 Log:0/14, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/14] + 3->ApplyThread MsgStorageApply Term:0 Log:0/12 Entries:[1/12 EntryNormal "prop_1"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/12] + +process-append-thread 1 2 3 +---- +> 1 processing append thread + Processing: + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] + Responses: + 1->1 MsgAppResp Term:1 Log:0/13 + AppendThread->1 MsgStorageAppendResp Term:1 Log:1/13 +> 2 processing append thread + Processing: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] + Responses: + 2->1 MsgAppResp Term:1 Log:0/13 + AppendThread->2 MsgStorageAppendResp Term:1 Log:1/13 +> 3 processing append thread + Processing: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] + Responses: + 3->1 MsgAppResp Term:1 Log:0/13 + AppendThread->3 MsgStorageAppendResp Term:1 Log:1/13 + +deliver-msgs 1 2 3 +---- +1->1 MsgAppResp Term:1 Log:0/13 +AppendThread->1 MsgStorageAppendResp Term:1 Log:1/13 +2->1 MsgAppResp Term:1 Log:0/13 +3->1 MsgAppResp Term:1 Log:0/13 +AppendThread->2 MsgStorageAppendResp Term:1 Log:1/13 +AppendThread->3 MsgStorageAppendResp Term:1 Log:1/13 + +propose 1 prop_4 +---- +ok + +process-ready 1 2 3 +---- +> 1 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:13 + Entries: + 1/15 EntryNormal "prop_4" + CommittedEntries: + 1/13 EntryNormal "prop_2" + Messages: + 1->2 MsgApp Term:1 Log:1/14 Commit:13 + 1->3 MsgApp Term:1 Log:1/14 Commit:13 + 1->2 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] + 1->3 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] HardState: Term:1 Vote:1 Commit:13 Responses:[1->1 MsgAppResp Term:1 Log:0/15, AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15] + 1->ApplyThread MsgStorageApply Term:0 Log:0/13 Entries:[1/13 EntryNormal "prop_2"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/13] +> 2 handling Ready + +> 3 handling Ready + + +deliver-msgs 1 2 3 +---- +1->2 MsgApp Term:1 Log:1/14 Commit:13 +1->2 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] +1->3 MsgApp Term:1 Log:1/14 Commit:13 +1->3 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] + +process-ready 1 2 3 +---- +> 1 handling Ready + +> 2 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:13 + Entries: + 1/15 EntryNormal "prop_4" + CommittedEntries: + 1/13 EntryNormal "prop_2" + Messages: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] HardState: Term:1 Vote:1 Commit:13 Responses:[2->1 MsgAppResp Term:1 Log:0/14, 2->1 MsgAppResp Term:1 Log:0/15, AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15] + 2->ApplyThread MsgStorageApply Term:0 Log:0/13 Entries:[1/13 EntryNormal "prop_2"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/13] +> 3 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:13 + Entries: + 1/15 EntryNormal "prop_4" + CommittedEntries: + 1/13 EntryNormal "prop_2" + Messages: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] HardState: Term:1 Vote:1 Commit:13 Responses:[3->1 MsgAppResp Term:1 Log:0/14, 3->1 MsgAppResp Term:1 Log:0/15, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15] + 3->ApplyThread MsgStorageApply Term:0 Log:0/13 Entries:[1/13 EntryNormal "prop_2"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/13] + +process-append-thread 1 2 3 +---- +> 1 processing append thread + Processing: + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] HardState: Term:1 Vote:1 Commit:12 + Responses: + 1->1 MsgAppResp Term:1 Log:0/14 + AppendThread->1 MsgStorageAppendResp Term:1 Log:1/14 +> 2 processing append thread + Processing: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] HardState: Term:1 Vote:1 Commit:12 + Responses: + 2->1 MsgAppResp Term:1 Log:0/13 + 2->1 MsgAppResp Term:1 Log:0/14 + AppendThread->2 MsgStorageAppendResp Term:1 Log:1/14 +> 3 processing append thread + Processing: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] HardState: Term:1 Vote:1 Commit:12 + Responses: + 3->1 MsgAppResp Term:1 Log:0/13 + 3->1 MsgAppResp Term:1 Log:0/14 + AppendThread->3 MsgStorageAppendResp Term:1 Log:1/14 + +process-apply-thread 1 2 3 +---- +> 1 processing apply thread + Processing: + 1->ApplyThread MsgStorageApply Term:0 Log:0/12 Entries:[1/12 EntryNormal "prop_1"] + Responses: + ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/12 +> 2 processing apply thread + Processing: + 2->ApplyThread MsgStorageApply Term:0 Log:0/12 Entries:[1/12 EntryNormal "prop_1"] + Responses: + ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/12 +> 3 processing apply thread + Processing: + 3->ApplyThread MsgStorageApply Term:0 Log:0/12 Entries:[1/12 EntryNormal "prop_1"] + Responses: + ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/12 + +deliver-msgs 1 2 3 +---- +1->1 MsgAppResp Term:1 Log:0/14 +AppendThread->1 MsgStorageAppendResp Term:1 Log:1/14 +2->1 MsgAppResp Term:1 Log:0/13 +2->1 MsgAppResp Term:1 Log:0/14 +3->1 MsgAppResp Term:1 Log:0/13 +3->1 MsgAppResp Term:1 Log:0/14 +ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/12 +AppendThread->2 MsgStorageAppendResp Term:1 Log:1/14 +ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/12 +AppendThread->3 MsgStorageAppendResp Term:1 Log:1/14 +ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/12 + +process-ready 1 2 3 +---- +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:14 + CommittedEntries: + 1/14 EntryNormal "prop_3" + Messages: + 1->2 MsgApp Term:1 Log:1/15 Commit:14 + 1->3 MsgApp Term:1 Log:1/15 Commit:14 + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:14 Responses:[AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15] + 1->ApplyThread MsgStorageApply Term:0 Log:0/14 Entries:[1/14 EntryNormal "prop_3"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/14] +> 2 handling Ready + +> 3 handling Ready + + +deliver-msgs 1 2 3 +---- +1->2 MsgApp Term:1 Log:1/15 Commit:14 +1->3 MsgApp Term:1 Log:1/15 Commit:14 + +process-ready 1 2 3 +---- +> 1 handling Ready + +> 2 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:14 + CommittedEntries: + 1/14 EntryNormal "prop_3" + Messages: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:14 Responses:[2->1 MsgAppResp Term:1 Log:0/15, AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15] + 2->ApplyThread MsgStorageApply Term:0 Log:0/14 Entries:[1/14 EntryNormal "prop_3"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/14] +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:14 + CommittedEntries: + 1/14 EntryNormal "prop_3" + Messages: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:14 Responses:[3->1 MsgAppResp Term:1 Log:0/15, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15] + 3->ApplyThread MsgStorageApply Term:0 Log:0/14 Entries:[1/14 EntryNormal "prop_3"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/14] + +process-append-thread 1 2 3 +---- +> 1 processing append thread + Processing: + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] HardState: Term:1 Vote:1 Commit:13 + Responses: + 1->1 MsgAppResp Term:1 Log:0/15 + AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15 +> 2 processing append thread + Processing: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] HardState: Term:1 Vote:1 Commit:13 + Responses: + 2->1 MsgAppResp Term:1 Log:0/14 + 2->1 MsgAppResp Term:1 Log:0/15 + AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15 +> 3 processing append thread + Processing: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] HardState: Term:1 Vote:1 Commit:13 + Responses: + 3->1 MsgAppResp Term:1 Log:0/14 + 3->1 MsgAppResp Term:1 Log:0/15 + AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15 + +process-apply-thread 1 2 3 +---- +> 1 processing apply thread + Processing: + 1->ApplyThread MsgStorageApply Term:0 Log:0/13 Entries:[1/13 EntryNormal "prop_2"] + Responses: + ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/13 +> 2 processing apply thread + Processing: + 2->ApplyThread MsgStorageApply Term:0 Log:0/13 Entries:[1/13 EntryNormal "prop_2"] + Responses: + ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/13 +> 3 processing apply thread + Processing: + 3->ApplyThread MsgStorageApply Term:0 Log:0/13 Entries:[1/13 EntryNormal "prop_2"] + Responses: + ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/13 + +deliver-msgs 1 2 3 +---- +1->1 MsgAppResp Term:1 Log:0/15 +AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15 +2->1 MsgAppResp Term:1 Log:0/14 +2->1 MsgAppResp Term:1 Log:0/15 +3->1 MsgAppResp Term:1 Log:0/14 +3->1 MsgAppResp Term:1 Log:0/15 +ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/13 +AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15 +ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/13 +AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15 +ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/13 + +process-ready 1 2 3 +---- +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:15 + CommittedEntries: + 1/15 EntryNormal "prop_4" + Messages: + 1->2 MsgApp Term:1 Log:1/15 Commit:15 + 1->3 MsgApp Term:1 Log:1/15 Commit:15 + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:15 Responses:[AppendThread->1 MsgStorageAppendResp Term:1 Log:0/0] + 1->ApplyThread MsgStorageApply Term:0 Log:0/15 Entries:[1/15 EntryNormal "prop_4"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/15] +> 2 handling Ready + +> 3 handling Ready + + +deliver-msgs 1 2 3 +---- +1->2 MsgApp Term:1 Log:1/15 Commit:15 +1->3 MsgApp Term:1 Log:1/15 Commit:15 + +process-ready 1 2 3 +---- +> 1 handling Ready + +> 2 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:15 + CommittedEntries: + 1/15 EntryNormal "prop_4" + Messages: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:15 Responses:[2->1 MsgAppResp Term:1 Log:0/15, AppendThread->2 MsgStorageAppendResp Term:1 Log:0/0] + 2->ApplyThread MsgStorageApply Term:0 Log:0/15 Entries:[1/15 EntryNormal "prop_4"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/15] +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:15 + CommittedEntries: + 1/15 EntryNormal "prop_4" + Messages: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:15 Responses:[3->1 MsgAppResp Term:1 Log:0/15, AppendThread->3 MsgStorageAppendResp Term:1 Log:0/0] + 3->ApplyThread MsgStorageApply Term:0 Log:0/15 Entries:[1/15 EntryNormal "prop_4"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/15] + +process-append-thread 2 3 +---- +> 2 processing append thread + Processing: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:14 + Responses: + 2->1 MsgAppResp Term:1 Log:0/15 + AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15 +> 3 processing append thread + Processing: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:14 + Responses: + 3->1 MsgAppResp Term:1 Log:0/15 + AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15 + +process-apply-thread 1 2 3 +---- +> 1 processing apply thread + Processing: + 1->ApplyThread MsgStorageApply Term:0 Log:0/14 Entries:[1/14 EntryNormal "prop_3"] + Responses: + ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/14 +> 2 processing apply thread + Processing: + 2->ApplyThread MsgStorageApply Term:0 Log:0/14 Entries:[1/14 EntryNormal "prop_3"] + Responses: + ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/14 +> 3 processing apply thread + Processing: + 3->ApplyThread MsgStorageApply Term:0 Log:0/14 Entries:[1/14 EntryNormal "prop_3"] + Responses: + ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/14 + +deliver-msgs 1 2 3 +---- +2->1 MsgAppResp Term:1 Log:0/15 +3->1 MsgAppResp Term:1 Log:0/15 +ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/14 +AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15 +ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/14 +AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15 +ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/14 + +process-ready 1 2 3 +---- +> 1 handling Ready + +> 2 handling Ready + +> 3 handling Ready + + +process-append-thread 2 3 +---- +> 2 processing append thread + Processing: + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:15 + Responses: + 2->1 MsgAppResp Term:1 Log:0/15 + AppendThread->2 MsgStorageAppendResp Term:1 Log:0/0 +> 3 processing append thread + Processing: + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:15 + Responses: + 3->1 MsgAppResp Term:1 Log:0/15 + AppendThread->3 MsgStorageAppendResp Term:1 Log:0/0 + +process-apply-thread 1 2 3 +---- +> 1 processing apply thread + Processing: + 1->ApplyThread MsgStorageApply Term:0 Log:0/15 Entries:[1/15 EntryNormal "prop_4"] + Responses: + ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/15 +> 2 processing apply thread + Processing: + 2->ApplyThread MsgStorageApply Term:0 Log:0/15 Entries:[1/15 EntryNormal "prop_4"] + Responses: + ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/15 +> 3 processing apply thread + Processing: + 3->ApplyThread MsgStorageApply Term:0 Log:0/15 Entries:[1/15 EntryNormal "prop_4"] + Responses: + ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/15 + +deliver-msgs 1 2 3 +---- +2->1 MsgAppResp Term:1 Log:0/15 +3->1 MsgAppResp Term:1 Log:0/15 +ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/15 +AppendThread->2 MsgStorageAppendResp Term:1 Log:0/0 +ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/15 +AppendThread->3 MsgStorageAppendResp Term:1 Log:0/0 +ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/15 + +process-ready 1 2 3 +---- +> 1 handling Ready + +> 2 handling Ready + +> 3 handling Ready + + +stabilize +---- +> 1 processing append thread + Processing: + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:14 + Responses: + AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15 + Processing: + 1->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:15 + Responses: + AppendThread->1 MsgStorageAppendResp Term:1 Log:0/0 +> 1 receiving messages + AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15 + AppendThread->1 MsgStorageAppendResp Term:1 Log:0/0 diff --git a/util.go b/util.go index d0872182..d5ed30b3 100644 --- a/util.go +++ b/util.go @@ -41,20 +41,26 @@ func max(a, b uint64) uint64 { } var isLocalMsg = [...]bool{ - pb.MsgHup: true, - pb.MsgBeat: true, - pb.MsgUnreachable: true, - pb.MsgSnapStatus: true, - pb.MsgCheckQuorum: true, + pb.MsgHup: true, + pb.MsgBeat: true, + pb.MsgUnreachable: true, + pb.MsgSnapStatus: true, + pb.MsgCheckQuorum: true, + pb.MsgStorageAppend: true, + pb.MsgStorageAppendResp: true, + pb.MsgStorageApply: true, + pb.MsgStorageApplyResp: true, } var isResponseMsg = [...]bool{ - pb.MsgAppResp: true, - pb.MsgVoteResp: true, - pb.MsgHeartbeatResp: true, - pb.MsgUnreachable: true, - pb.MsgReadIndexResp: true, - pb.MsgPreVoteResp: true, + pb.MsgAppResp: true, + pb.MsgVoteResp: true, + pb.MsgHeartbeatResp: true, + pb.MsgUnreachable: true, + pb.MsgReadIndexResp: true, + pb.MsgPreVoteResp: true, + pb.MsgStorageAppendResp: true, + pb.MsgStorageApplyResp: true, } func isMsgInArray(msgt pb.MessageType, arr []bool) bool { @@ -70,6 +76,10 @@ func IsResponseMsg(msgt pb.MessageType) bool { return isMsgInArray(msgt, isResponseMsg[:]) } +func IsLocalMsgTarget(id uint64) bool { + return id == LocalAppendThread || id == LocalApplyThread +} + // voteResponseType maps vote and prevote message types to their corresponding responses. func voteRespMsgType(msgt pb.MessageType) pb.MessageType { switch msgt { @@ -153,7 +163,8 @@ type EntryFormatter func([]byte) string // Message for debugging. func DescribeMessage(m pb.Message, f EntryFormatter) string { var buf bytes.Buffer - fmt.Fprintf(&buf, "%x->%x %v Term:%d Log:%d/%d", m.From, m.To, m.Type, m.Term, m.LogTerm, m.Index) + fmt.Fprintf(&buf, "%s->%s %v Term:%d Log:%d/%d", + describeTarget(m.From), describeTarget(m.To), m.Type, m.Term, m.LogTerm, m.Index) if m.Reject { fmt.Fprintf(&buf, " Rejected (Hint: %d)", m.RejectHint) } @@ -170,12 +181,38 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string { } fmt.Fprint(&buf, "]") } + if m.HardState != nil { + fmt.Fprintf(&buf, " HardState: %s", DescribeHardState(*m.HardState)) + } if s := m.Snapshot; s != nil && !IsEmptySnap(*s) { fmt.Fprintf(&buf, " Snapshot: %s", DescribeSnapshot(*s)) } + if len(m.Responses) > 0 { + fmt.Fprintf(&buf, " Responses:[") + for i, m := range m.Responses { + if i != 0 { + buf.WriteString(", ") + } + buf.WriteString(DescribeMessage(m, f)) + } + fmt.Fprintf(&buf, "]") + } return buf.String() } +func describeTarget(id uint64) string { + switch id { + case None: + return "None" + case LocalAppendThread: + return "AppendThread" + case LocalApplyThread: + return "ApplyThread" + default: + return fmt.Sprintf("%x", id) + } +} + // PayloadSize is the size of the payload of this Entry. Notably, it does not // depend on its Index or Term. func PayloadSize(e pb.Entry) int { diff --git a/util_test.go b/util_test.go index d8f69dbb..e711ec16 100644 --- a/util_test.go +++ b/util_test.go @@ -89,6 +89,10 @@ func TestIsLocalMsg(t *testing.T) { {pb.MsgReadIndexResp, false}, {pb.MsgPreVote, false}, {pb.MsgPreVoteResp, false}, + {pb.MsgStorageAppend, true}, + {pb.MsgStorageAppendResp, true}, + {pb.MsgStorageApply, true}, + {pb.MsgStorageApplyResp, true}, } for _, tt := range tests { @@ -122,6 +126,10 @@ func TestIsResponseMsg(t *testing.T) { {pb.MsgReadIndexResp, true}, {pb.MsgPreVote, false}, {pb.MsgPreVoteResp, true}, + {pb.MsgStorageAppend, false}, + {pb.MsgStorageAppendResp, true}, + {pb.MsgStorageApply, false}, + {pb.MsgStorageApplyResp, true}, } for i, tt := range tests {