Skip to content

Commit

Permalink
Merge pull request #10892 from tbg/rawnode-everywhere-attempt3
Browse files Browse the repository at this point in the history
raft: use RawNode for node's event loop; clean up bootstrap
  • Loading branch information
tbg committed Jul 19, 2019
2 parents 233be58 + caa48bc commit 3c5e2f5
Show file tree
Hide file tree
Showing 8 changed files with 413 additions and 358 deletions.
6 changes: 5 additions & 1 deletion etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,11 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
}
}

n = raft.StartNode(c, peers)
if len(peers) == 0 {
n = raft.RestartNode(c)
} else {
n = raft.StartNode(c, peers)
}
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
Expand Down
80 changes: 80 additions & 0 deletions raft/bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package raft

import (
"errors"

pb "go.etcd.io/etcd/raft/raftpb"
)

// Bootstrap initializes the RawNode for first use by appending configuration
// changes for the supplied peers. This method returns an error if the Storage
// is nonempty.
//
// It is recommended that instead of calling this method, applications bootstrap
// their state manually by setting up a Storage that has a first index > 1 and
// which stores the desired ConfState as its InitialState.
func (rn *RawNode) Bootstrap(peers []Peer) error {
if len(peers) == 0 {
return errors.New("must provide at least one peer to Bootstrap")
}
lastIndex, err := rn.raft.raftLog.storage.LastIndex()
if err != nil {
return err
}

if lastIndex != 0 {
return errors.New("can't bootstrap a nonempty Storage")
}

// We've faked out initial entries above, but nothing has been
// persisted. Start with an empty HardState (thus the first Ready will
// emit a HardState update for the app to persist).
rn.prevHardSt = emptyState

// TODO(tbg): remove StartNode and give the application the right tools to
// bootstrap the initial membership in a cleaner way.
rn.raft.becomeFollower(1, None)
ents := make([]pb.Entry, len(peers))
for i, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
data, err := cc.Marshal()
if err != nil {
return err
}

ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
}
rn.raft.raftLog.append(ents...)

// Now apply them, mainly so that the application can call Campaign
// immediately after StartNode in tests. Note that these nodes will
// be added to raft twice: here and when the application's Ready
// loop calls ApplyConfChange. The calls to addNode must come after
// all calls to raftLog.append so progress.next is set after these
// bootstrapping entries (it is an error if we try to append these
// entries since they have already been committed).
// We do not set raftLog.applied so the application will be able
// to observe all conf changes via Ready.CommittedEntries.
//
// TODO(bdarnell): These entries are still unstable; do we need to preserve
// the invariant that committed < unstable?
rn.raft.raftLog.committed = uint64(len(ents))
for _, peer := range peers {
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
}
return nil
}
121 changes: 33 additions & 88 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,52 +197,22 @@ type Peer struct {

// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
//
// Peers must not be zero length; call RestartNode in that case.
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c)
// become the follower at term 1 and apply initial configuration
// entries of term 1
r.becomeFollower(1, None)
for _, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
d, err := cc.Marshal()
if err != nil {
panic("unexpected marshal error")
}
// TODO(tbg): this should append the ConfChange for the own node first
// and also call applyConfChange below for that node first. Otherwise
// we have a Raft group (for a little while) that doesn't have itself
// in its config, which is bad.
// This whole way of setting things up is rickety. The app should just
// populate the initial ConfState appropriately and then all of this
// goes away.
e := pb.Entry{
Type: pb.EntryConfChange,
Term: 1,
Index: r.raftLog.lastIndex() + 1,
Data: d,
}
r.raftLog.append(e)
if len(peers) == 0 {
panic("no peers given; use RestartNode instead")
}
// Mark these initial entries as committed.
// TODO(bdarnell): These entries are still unstable; do we need to preserve
// the invariant that committed < unstable?
r.raftLog.committed = r.raftLog.lastIndex()
// Now apply them, mainly so that the application can call Campaign
// immediately after StartNode in tests. Note that these nodes will
// be added to raft twice: here and when the application's Ready
// loop calls ApplyConfChange. The calls to addNode must come after
// all calls to raftLog.append so progress.next is set after these
// bootstrapping entries (it is an error if we try to append these
// entries since they have already been committed).
// We do not set raftLog.applied so the application will be able
// to observe all conf changes via Ready.CommittedEntries.
for _, peer := range peers {
r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
rn, err := NewRawNode(c)
if err != nil {
panic(err)
}
rn.Bootstrap(peers)

n := newNode()
n.logger = c.Logger
go n.run(r)

go n.run(rn)
return &n
}

Expand All @@ -251,11 +221,13 @@ func StartNode(c *Config, peers []Peer) Node {
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node {
r := newRaft(c)

rn, err := NewRawNode(c)
if err != nil {
panic(err)
}
n := newNode()
n.logger = c.Logger
go n.run(r)
go n.run(rn)
return &n
}

Expand Down Expand Up @@ -310,30 +282,30 @@ func (n *node) Stop() {
<-n.done
}

func (n *node) run(r *raft) {
func (n *node) run(rn *RawNode) {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei, prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var applyingToI uint64
var rd Ready

r := rn.raft

lead := None
prevSoftSt := r.softState()
prevHardSt := emptyState

for {
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
} else if rn.HasReady() {
// Populate a Ready. Note that this Ready is not guaranteed to
// actually be handled. We will arm readyc, but there's no guarantee
// that we will actually send on it. It's possible that we will
// service another channel instead, loop around, and then populate
// the Ready again. We could instead force the previous Ready to be
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = rn.Ready()
readyc = n.readyc
}

if lead != r.lead {
Expand Down Expand Up @@ -382,40 +354,13 @@ func (n *node) run(r *raft) {
case <-n.done:
}
case <-n.tickc:
r.tick()
rn.Tick()
case readyc <- rd:
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
if index := rd.appliedCursor(); index != 0 {
applyingToI = index
}

r.msgs = nil
r.readStates = nil
r.reduceUncommittedSize(rd.CommittedEntries)
rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
if applyingToI != 0 {
r.raftLog.appliedTo(applyingToI)
applyingToI = 0
}
if havePrevLastUnstablei {
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
havePrevLastUnstablei = false
}
r.raftLog.stableSnapTo(prevSnapi)
rn.commitReady(rd)
rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)
Expand Down
4 changes: 2 additions & 2 deletions raft/node_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func BenchmarkOneNode(b *testing.B) {

n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
go n.run(rn)

defer n.Stop()

Expand Down
Loading

0 comments on commit 3c5e2f5

Please sign in to comment.