Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: allow use of joint quorums #10914

Merged
merged 4 commits into from
Jul 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"time"

bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/etcdserver/api/membership"
Expand All @@ -43,8 +44,6 @@ import (
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/wal"
"go.etcd.io/etcd/wal/walpb"

bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -482,7 +481,7 @@ func (s *v3Manager) saveWALAndSnap() error {
Index: commit,
Term: term,
ConfState: raftpb.ConfState{
Nodes: nodeIDs,
Voters: nodeIDs,
},
},
}
Expand Down
3 changes: 1 addition & 2 deletions etcdserver/api/snap/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ import (
"testing"

"go.etcd.io/etcd/raft/raftpb"

"go.uber.org/zap"
)

var testSnap = &raftpb.Snapshot{
Data: []byte("some snapshot"),
Metadata: raftpb.SnapshotMetadata{
ConfState: raftpb.ConfState{
Nodes: []uint64{1, 2, 3},
Voters: []uint64{1, 2, 3},
},
Index: 1,
Term: 1,
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
func getIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
ids := make(map[uint64]bool)
if snap != nil {
for _, id := range snap.Metadata.ConfState.Nodes {
for _, id := range snap.Metadata.ConfState.Voters {
ids[id] = true
}
}
Expand Down
16 changes: 8 additions & 8 deletions etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ func TestGetIDs(t *testing.T) {
widSet []uint64
}{
{nil, []raftpb.Entry{}, []uint64{}},
{&raftpb.ConfState{Nodes: []uint64{1}},
{&raftpb.ConfState{Voters: []uint64{1}},
[]raftpb.Entry{}, []uint64{1}},
{&raftpb.ConfState{Nodes: []uint64{1}},
{&raftpb.ConfState{Voters: []uint64{1}},
[]raftpb.Entry{addEntry}, []uint64{1, 2}},
{&raftpb.ConfState{Nodes: []uint64{1}},
{&raftpb.ConfState{Voters: []uint64{1}},
[]raftpb.Entry{addEntry, removeEntry}, []uint64{1}},
{&raftpb.ConfState{Nodes: []uint64{1}},
{&raftpb.ConfState{Voters: []uint64{1}},
[]raftpb.Entry{addEntry, normalEntry}, []uint64{1, 2}},
{&raftpb.ConfState{Nodes: []uint64{1}},
{&raftpb.ConfState{Voters: []uint64{1}},
[]raftpb.Entry{addEntry, normalEntry, updateEntry}, []uint64{1, 2}},
{&raftpb.ConfState{Nodes: []uint64{1}},
{&raftpb.ConfState{Voters: []uint64{1}},
[]raftpb.Entry{addEntry, removeEntry, normalEntry}, []uint64{1}},
}

Expand Down Expand Up @@ -178,8 +178,8 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
}
}

// TestConfgChangeBlocksApply ensures apply blocks if committed entries contain config-change.
func TestConfgChangeBlocksApply(t *testing.T) {
// TestConfigChangeBlocksApply ensures apply blocks if committed entries contain config-change.
func TestConfigChangeBlocksApply(t *testing.T) {
n := newNopReadyNode()

r := newRaftNode(raftNodeConfig{
Expand Down
37 changes: 26 additions & 11 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"testing"
"time"

"go.uber.org/zap"

"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/etcdserver/api/rafthttp"
"go.etcd.io/etcd/etcdserver/api/snap"
Expand All @@ -49,6 +47,7 @@ import (
"go.etcd.io/etcd/pkg/wait"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.uber.org/zap"
)

// TestDoLocalAction tests requests which do not need to go through raft to be applied,
Expand Down Expand Up @@ -1017,7 +1016,7 @@ func TestSnapshot(t *testing.T) {
}
}()

srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
<-ch
<-ch
}
Expand Down Expand Up @@ -1632,7 +1631,7 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}})
return nil
}
func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error {
n.Record(testutil.Action{Name: "ProposeConfChange"})
return nil
}
Expand All @@ -1645,7 +1644,7 @@ func (n *nodeRecorder) Ready() <-chan raft.Ready
func (n *nodeRecorder) TransferLeadership(ctx context.Context, lead, transferee uint64) {}
func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil }
func (n *nodeRecorder) Advance() {}
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState {
n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
return &raftpb.ConfState{}
}
Expand Down Expand Up @@ -1706,21 +1705,37 @@ func newNodeConfChangeCommitterStream() *nodeConfChangeCommitterRecorder {
return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0}
}

func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
data, err := conf.Marshal()
func confChangeActionName(conf raftpb.ConfChangeI) string {
var s string
if confV1, ok := conf.AsV1(); ok {
s = confV1.Type.String()
} else {
for i, chg := range conf.AsV2().Changes {
if i > 0 {
s += "/"
}
s += chg.Type.String()
}
}
return s
}

func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error {
typ, data, err := raftpb.MarshalConfChange(conf)
if err != nil {
return err
}

n.index++
n.Record(testutil.Action{Name: "ProposeConfChange:" + conf.Type.String()})
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}}
n.Record(testutil.Action{Name: "ProposeConfChange:" + confChangeActionName(conf)})
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: typ, Data: data}}}
return nil
}
func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
return n.readyc
}
func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
n.Record(testutil.Action{Name: "ApplyConfChange:" + conf.Type.String()})
func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState {
n.Record(testutil.Action{Name: "ApplyConfChange:" + confChangeActionName(conf)})
return &raftpb.ConfState{}
}

Expand Down
2 changes: 1 addition & 1 deletion raft/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {
// the invariant that committed < unstable?
rn.raft.raftLog.committed = uint64(len(ents))
for _, peer := range peers {
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
}
return nil
}
22 changes: 13 additions & 9 deletions raft/confchange/confchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ type Changer struct {
// to
// (1 2 3)&&(1 2 3).
//
// The supplied ConfChanges are then applied to the incoming majority config,
// The supplied changes are then applied to the incoming majority config,
// resulting in a joint configuration that in terms of the Raft thesis[1]
// (Section 4.3) corresponds to `C_{new,old}`.
//
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
func (c Changer) EnterJoint(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressMap, error) {
func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
cfg, prs, err := c.checkAndCopy()
if err != nil {
return c.err(err)
Expand Down Expand Up @@ -74,7 +74,7 @@ func (c Changer) EnterJoint(ccs ...pb.ConfChange) (tracker.Config, tracker.Progr
if err := c.apply(&cfg, prs, ccs...); err != nil {
return c.err(err)
}

cfg.AutoLeave = autoLeave
return checkAndReturn(cfg, prs)
}

Expand Down Expand Up @@ -120,6 +120,7 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
}
}
*outgoingPtr(&cfg.Voters) = nil
cfg.AutoLeave = false

return checkAndReturn(cfg, prs)
}
Expand All @@ -129,7 +130,7 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
// will return an error if that is not the case, if the resulting quorum is
// zero, or if the configuration is in a joint state (i.e. if there is an
// outgoing configuration).
func (c Changer) Simple(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressMap, error) {
func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
cfg, prs, err := c.checkAndCopy()
if err != nil {
return c.err(err)
Expand All @@ -142,7 +143,7 @@ func (c Changer) Simple(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressM
return c.err(err)
}
if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
return tracker.Config{}, nil, errors.New("more than voter changed without entering joint config")
return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config")
}
if err := checkInvariants(cfg, prs); err != nil {
return tracker.Config{}, tracker.ProgressMap{}, nil
Expand All @@ -151,14 +152,14 @@ func (c Changer) Simple(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressM
return checkAndReturn(cfg, prs)
}

// apply a ConfChange to the configuration. By convention, changes to voters are
// apply a change to the configuration. By convention, changes to voters are
// always made to the incoming majority config Voters[0]. Voters[1] is either
// empty or preserves the outgoing majority configuration while in a joint state.
func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChange) error {
func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChangeSingle) error {
for _, cc := range ccs {
if cc.NodeID == 0 {
// etcd replaces the NodeID with zero if it decides (downstream of
// raft) to not apply a ConfChange, so we have to have explicit code
// raft) to not apply a change, so we have to have explicit code
// here to ignore these.
continue
}
Expand Down Expand Up @@ -327,6 +328,9 @@ func checkInvariants(cfg tracker.Config, prs tracker.ProgressMap) error {
if cfg.LearnersNext != nil {
return fmt.Errorf("LearnersNext must be nil when not joint")
}
if cfg.AutoLeave {
return fmt.Errorf("AutoLeave must be false when not joint")
}
}

return nil
Expand Down Expand Up @@ -408,7 +412,7 @@ func outgoingPtr(voters *quorum.JointConfig) *quorum.MajorityConfig { return &vo

// Describe prints the type and NodeID of the configuration changes as a
// space-delimited string.
func Describe(ccs ...pb.ConfChange) string {
func Describe(ccs ...pb.ConfChangeSingle) string {
var buf strings.Builder
for _, cc := range ccs {
if buf.Len() > 0 {
Expand Down
10 changes: 7 additions & 3 deletions raft/confchange/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestConfChangeDataDriven(t *testing.T) {
defer func() {
c.LastIndex++
}()
var ccs []pb.ConfChange
var ccs []pb.ConfChangeSingle
toks := strings.Split(strings.TrimSpace(d.Input), " ")
if toks[0] == "" {
toks = nil
Expand All @@ -57,7 +57,7 @@ func TestConfChangeDataDriven(t *testing.T) {
if len(tok) < 2 {
return fmt.Sprintf("unknown token %s", tok)
}
var cc pb.ConfChange
var cc pb.ConfChangeSingle
switch tok[0] {
case 'v':
cc.Type = pb.ConfChangeAddNode
Expand Down Expand Up @@ -85,7 +85,11 @@ func TestConfChangeDataDriven(t *testing.T) {
case "simple":
cfg, prs, err = c.Simple(ccs...)
case "enter-joint":
cfg, prs, err = c.EnterJoint(ccs...)
var autoLeave bool
if len(d.CmdArgs) > 0 {
d.ScanArgs(t, "autoleave", &autoLeave)
}
cfg, prs, err = c.EnterJoint(autoLeave, ccs...)
case "leave-joint":
if len(ccs) > 0 {
err = errors.New("this command takes no input")
Expand Down
Loading