Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce follower replication #11455

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 228 additions & 0 deletions raft/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package raft

import (
"go.etcd.io/etcd/raft/tracker"
)

type Group struct {
ID uint64
Members []uint64
}

func (g *Group) IsValid() bool {
return g.ID != None && len(g.Members) > 0
}

type MemberInfo struct {
Delegate uint64
Group uint64
}

type Groups struct {
// LeaderGroupID is used for ensuring the peers in the group will not be delegated
LeaderGroupID uint64
// BcastTargets contains the pair: group delegate => other members in the same group
// The peers in the map should be mutually exclusive with `unresolvedPeers`
BcastTargets map[uint64][]uint64
// Indexes is a map containing node id => group info pairs
Indexes map[uint64]*MemberInfo
// All the peers without delegates
unresolvedPeers []uint64
}

// NewGroups creates a new `Groups` from a collection of `Group`s
func NewGroups(configs []Group) Groups {
groups := Groups{
Indexes: make(map[uint64]*MemberInfo),
BcastTargets: make(map[uint64][]uint64),
unresolvedPeers: make([]uint64, 0),
LeaderGroupID: None,
}
for _, c := range configs {
if c.IsValid() {
for _, m := range c.Members {
groups.Indexes[m] = &MemberInfo{
Delegate: None,
Group: c.ID,
}
groups.unresolvedPeers = append(groups.unresolvedPeers, m)
}
}
}
return groups
}

func (g *Groups) GetMemberInfo(member uint64) *MemberInfo {
return g.Indexes[member]
}

// GetDelegate returns the delegate of given `peer`. The delegate could be the `peer` itself
func (g *Groups) GetDelegate(peer uint64) uint64 {
if info, ok := g.Indexes[peer]; ok {
return info.Delegate
}
return None
}

// pickDelegate picks a delegate who has the biggest `match` among the same group of given `to`
func (g *Groups) pickDelegate(to uint64, prs tracker.ProgressMap) {
groupID := None
if info, ok := g.Indexes[to]; ok {
if info.Delegate != None {
// The delegate have already been picked
return
}
if info.Group == g.LeaderGroupID {
// Never pick delegate for the member in the leader's group
return
}
groupID = info.Group
} else {
// The peer is not included in Groups system, just ignore it
return
}

chosen, match, bcastTargets := None, None, make([]uint64, 0)
for _, member := range g.candidateDelegates(groupID) {
if pr := prs[member]; pr != nil {
if pr.Match > match {
if chosen != None {
// The old `chosen` must be delegated
bcastTargets = append(bcastTargets, member)
}
chosen = member
match = pr.Match
} else {
bcastTargets = append(bcastTargets, member)
}
}
}
// If there is only one member in the group, it remains `unresolvedPeers`
if chosen != None && len(bcastTargets) > 0 {
g.Indexes[chosen].Delegate = chosen
for _, peer := range bcastTargets {
g.Indexes[peer].Delegate = chosen
}
g.BcastTargets[chosen] = bcastTargets
}
}

// IsDelegated returns whether the given peer has a delegate.
// If `peer` is a delegate, returns false
func (g *Groups) IsDelegated(peer uint64) bool {
if info, ok := g.Indexes[peer]; ok {
return info.Delegate != None && info.Delegate != peer
}
return false
}

// UpdateGroup updates given `peer`'s group ID and returns true if any peers are unresolved
func (g *Groups) UpdateGroup(peer uint64, group uint64) bool {
if group == None {
g.unmarkPeer(peer)
} else if info, ok := g.Indexes[peer]; ok {
if info.Group == group {
// No need to update the same group
return false
}
// Not a same group, update the peer
g.unmarkPeer(peer)
g.markPeer(peer, group)
} else {
g.markPeer(peer, group)
}
return len(g.unresolvedPeers) > 0
}

// unmarkPeer just removes the peer from Groups system.
// If `peer` is a delegate, this only removes the delegate itself
func (g *Groups) unmarkPeer(peer uint64) {
if info, ok := g.Indexes[peer]; ok {
if info.Delegate == peer {
g.RemoveDelegate(peer)
return
}
// The peer is not a delegate, exclude it from the delegated members
targets := g.BcastTargets[info.Delegate]
for i, p := range targets {
if p == peer {
g.BcastTargets[info.Delegate] = append(targets[:i], targets[i+1:]...)
}
}
}
delete(g.Indexes, peer)
}

func (g *Groups) markPeer(peer uint64, group uint64) {
found, delegate := false, None
for _, info := range g.Indexes {
if info.Group == group {
found = true
// The delegate could still be `None` here
delegate = info.Delegate
break
}
}
if _, ok := g.Indexes[peer]; ok {
panic("Peer should never exist in indexes before marking")
}
g.Indexes[peer] = &MemberInfo{
Delegate: delegate,
Group: group,
}
if delegate != None {
g.BcastTargets[delegate] = append(g.BcastTargets[delegate], peer)
} else if found {
// No delegate in given `group` has been picked, add `peer` into unresolved peers
g.unresolvedPeers = append(g.unresolvedPeers, peer)
}
}

// ResolveDelegates picks delegates for all unresolved peers
func (g *Groups) ResolveDelegates(prs tracker.ProgressMap) {
for _, peer := range g.unresolvedPeers {
g.pickDelegate(peer, prs)
}
g.unresolvedPeers = g.unresolvedPeers[:0]
}

// RemoveDelegate removes the given `delegate` peer from Groups system.
// This should be called only when the delegate peer is detected to be unreachable
func (g *Groups) RemoveDelegate(delegate uint64) {
if _, ok := g.BcastTargets[delegate]; ok {
// Since `delegate` is unreachable, remove it from `Indexes`
delete(g.Indexes, delegate)
for peer, info := range g.Indexes {
if info.Delegate == delegate {
info.Delegate = None
g.unresolvedPeers = append(g.unresolvedPeers, peer)
}
}
delete(g.BcastTargets, delegate)
}
}

// candidateDelegates returns all the members in given group
func (g *Groups) candidateDelegates(groupID uint64) []uint64 {
res := make([]uint64, 0)
for p, info := range g.Indexes {
if groupID == info.Group {
res = append(res, p)
}
}
return res
}
132 changes: 132 additions & 0 deletions raft/group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package raft

import (
"testing"

"go.etcd.io/etcd/raft/tracker"
)

func next_delegate_and_targets(groups *Groups) (uint64, []uint64) {
var delegate uint64
targets := make([]uint64, 0)
for k, v := range groups.BcastTargets {
delegate = k
targets = v
}
return delegate, targets
}

func TestGroupSystem(t *testing.T) {
groups := NewGroups([]Group{
{
ID: 1,
Members: []uint64{1, 2},
}, {
ID: 2,
Members: []uint64{3, 4, 5},
}, {
ID: 3,
Members: []uint64{6},
},
})
if len(groups.unresolvedPeers) != 6 {
t.Fatalf("expect unsolved peers length: %d, but got: %d", 6, len(groups.unresolvedPeers))
}
groups.LeaderGroupID = 1
prs := tracker.MakeProgressTracker(256)
for i := 1; i <= 6; i++ {
prs.Progress[uint64(i)] = &tracker.Progress{
Match: 99,
Next: 100,
Inflights: tracker.NewInflights(100),
}
}
groups.ResolveDelegates(prs.Progress)
if len(groups.BcastTargets) != 1 || groups.IsDelegated(1) || groups.IsDelegated(6) {
t.Fatalf("only the delegate of group 2 should be picked")
}

// Remove a delegate which doesn't exists
groups.RemoveDelegate(6)
if len(groups.unresolvedPeers) != 0 {
t.Fatalf("All the peers should be resolved")
}

delegate, targets := next_delegate_and_targets(&groups)
if len(targets) != 2 {
t.Fatalf("Unexpected number of peers delegated")
}

// Remove a delegated peer but not a delegate
var to_be_removed uint64
switch delegate {
case 3:
to_be_removed = 4
case 4:
to_be_removed = 5
case 5:
to_be_removed = 3
default:
t.Fatalf("Unexpected delegate")

}
groups.RemoveDelegate(to_be_removed)
if len(groups.unresolvedPeers) != 0 {
t.Fatalf("All the peers should be resolved")
}

// Remove a delegate from group system
groups.RemoveDelegate(delegate)
if len(groups.unresolvedPeers) != 2 {
t.Fatalf("Peers in group 2 should become unsolved after the delegate being removed")
}
groups.ResolveDelegates(prs.Progress)
_, targets = next_delegate_and_targets(&groups)
if len(targets) != 1 {
t.Fatalf("The delegate must be picked even if there're only 2 peers in group2")
}

// Add the removed peer back, but without group id
readded := delegate
groups.UpdateGroup(readded, None)
if len(groups.unresolvedPeers) != 0 {
t.Fatalf("UpdateGroup with a invalid group id will be ignored")
}

// Add the removed peer back with group id
groups.UpdateGroup(readded, 2)
delegate, targets = next_delegate_and_targets(&groups)
if len(targets) != 2 {
t.Fatalf("The peer should be included in groups system after invalid UpdateGroup")
}
if delegate == readded {
t.Fatalf("A new added peer should never changes the delegate in current group")
}

// Remove peer from group system if it's join in again with a invalid Group id
groups.UpdateGroup(readded, None)
_, targets = next_delegate_and_targets(&groups)
if len(targets) != 1 {
t.Fatalf("An already delegated peer can be removed")
}

// Move delegate peer to group 3
move_to_new_group := delegate
if !groups.UpdateGroup(move_to_new_group, 3) && len(groups.BcastTargets) != 0 {
t.Fatalf("The delegated peers should become unsolved if the delegate's group is updated")
}
}
3 changes: 2 additions & 1 deletion raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,11 @@ func TestNodeReadIndexToOldLeader(t *testing.T) {
if len(r1.msgs) != 2 {
t.Fatalf("len(r1.msgs) expected 1, got %d", len(r1.msgs))
}
readIndxMsg3 := raftpb.Message{From: 1, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries}
readIndxMsg3 := raftpb.Message{From: 2, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries}
if !reflect.DeepEqual(r1.msgs[0], readIndxMsg3) {
t.Fatalf("r1.msgs[0] expected %+v, got %+v", readIndxMsg3, r1.msgs[0])
}
readIndxMsg3 = raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries}
if !reflect.DeepEqual(r1.msgs[1], readIndxMsg3) {
t.Fatalf("r1.msgs[1] expected %+v, got %+v", readIndxMsg3, r1.msgs[1])
}
Expand Down
Loading