Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: Allow Join to be called multiple times for the same cluster member #2198

Merged
merged 2 commits into from
Jul 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ type Config struct {
// cluster to join.
JoinRaft string

// Top-level state directory
// ForceJoin causes us to invoke raft's Join RPC even if already part
// of a cluster.
ForceJoin bool

// StateDir is the top-level state directory
StateDir string

// ForceNewCluster defines if we have to force a new cluster
Expand Down Expand Up @@ -201,6 +205,7 @@ func New(config *Config) (*Manager, error) {
newNodeOpts := raft.NodeOptions{
ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
JoinAddr: config.JoinRaft,
ForceJoin: config.ForceJoin,
Config: raftCfg,
StateDir: raftStateDir,
ForceNewCluster: config.ForceNewCluster,
Expand Down
96 changes: 65 additions & 31 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ type NodeOptions struct {
// JoinAddr is the cluster to join. May be an empty string to create
// a standalone cluster.
JoinAddr string
// ForceJoin tells us to join even if already part of a cluster.
ForceJoin bool
// Config is the raft config.
Config *raft.Config
// StateDir is the directory to store durable state.
Expand Down Expand Up @@ -393,16 +395,17 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {

// restore from snapshot
if loadAndStartErr == nil {
if n.opts.JoinAddr != "" {
log.G(ctx).Warning("ignoring request to join cluster, because raft state already exists")
if n.opts.JoinAddr != "" && n.opts.ForceJoin {
if err := n.joinCluster(ctx); err != nil {
return errors.Wrap(err, "failed to rejoin cluster")
}
}
n.campaignWhenAble = true
n.initTransport()
n.raftNode = raft.RestartNode(n.Config)
return nil
}

// first member of cluster
if n.opts.JoinAddr == "" {
// First member in the cluster, self-assign ID
n.Config.ID = uint64(rand.Int63()) + 1
Expand All @@ -417,6 +420,22 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
}

// join to existing cluster

if err := n.joinCluster(ctx); err != nil {
return err
}

if _, err := n.newRaftLogs(n.opts.ID); err != nil {
return err
}

n.initTransport()
n.raftNode = raft.StartNode(n.Config, nil)

return nil
}

func (n *Node) joinCluster(ctx context.Context) error {
if n.opts.Addr == "" {
return errors.New("attempted to join raft cluster without knowing own address")
}
Expand All @@ -438,15 +457,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
}

n.Config.ID = resp.RaftID

if _, err := n.newRaftLogs(n.opts.ID); err != nil {
return err
}
n.bootstrapMembers = resp.Members

n.initTransport()
n.raftNode = raft.StartNode(n.Config, nil)

return nil
}

Expand Down Expand Up @@ -909,24 +920,6 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrLostLeadership.Error())
}

// A single manager must not be able to join the raft cluster twice. If
// it did, that would cause the quorum to be computed incorrectly. This
// could happen if the WAL was deleted from an active manager.
for _, m := range n.cluster.Members() {
if m.NodeID == nodeInfo.NodeID {
return nil, grpc.Errorf(codes.AlreadyExists, "%s", "a raft member with this node ID already exists")
}
}

// Find a unique ID for the joining member.
var raftID uint64
for {
raftID = uint64(rand.Int63()) + 1
if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) {
break
}
}

remoteAddr := req.Addr

// If the joining node sent an address like 0.0.0.0:4242, automatically
Expand All @@ -953,12 +946,54 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
return nil, err
}

// If the peer is already a member of the cluster, we will only update
// its information, not add it as a new member. Adding it again would
// cause the quorum to be computed incorrectly.
for _, m := range n.cluster.Members() {
if m.NodeID == nodeInfo.NodeID {
if remoteAddr == m.Addr {
return n.joinResponse(m.RaftID), nil
}
updatedRaftMember := &api.RaftMember{
RaftID: m.RaftID,
NodeID: m.NodeID,
Addr: remoteAddr,
}
if err := n.cluster.UpdateMember(m.RaftID, updatedRaftMember); err != nil {
return nil, err
}

if err := n.updateNodeBlocking(ctx, m.RaftID, remoteAddr); err != nil {
log.WithError(err).Error("failed to update node address")
return nil, err
}

log.Info("updated node address")
return n.joinResponse(m.RaftID), nil
}
}

// Find a unique ID for the joining member.
var raftID uint64
for {
raftID = uint64(rand.Int63()) + 1
if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) {
break
}
}

err = n.addMember(ctx, remoteAddr, raftID, nodeInfo.NodeID)
if err != nil {
log.WithError(err).Errorf("failed to add member %x", raftID)
return nil, err
}

log.Debug("node joined")

return n.joinResponse(raftID), nil
}

func (n *Node) joinResponse(raftID uint64) *api.JoinResponse {
var nodes []*api.RaftMember
for _, node := range n.cluster.Members() {
nodes = append(nodes, &api.RaftMember{
Expand All @@ -967,9 +1002,8 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
Addr: node.Addr,
})
}
log.Debugf("node joined")

return &api.JoinResponse{Members: nodes, RaftID: raftID}, nil
return &api.JoinResponse{Members: nodes, RaftID: raftID}
}

// checkHealth tries to contact an aspiring member through its advertised address
Expand Down
35 changes: 29 additions & 6 deletions manager/state/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"io/ioutil"
"log"
"math/rand"
"net"
"os"
"reflect"
"strconv"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"

"golang.org/x/net/context"
Expand Down Expand Up @@ -88,9 +88,19 @@ func dial(n *raftutils.TestNode, addr string) (*grpc.ClientConn, error) {
func TestRaftJoinTwice(t *testing.T) {
t.Parallel()

nodes, _ := raftutils.NewRaftCluster(t, tc)
nodes, clockSource := raftutils.NewRaftCluster(t, tc)
defer raftutils.TeardownCluster(nodes)

// Node 3's address changes
nodes[3].Server.Stop()
nodes[3].ShutdownRaft()
nodes[3].Listener.CloseListener()

l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "can't bind to raft service port")
nodes[3].Listener = raftutils.NewWrappedListener(l)
nodes[3] = raftutils.RestartNode(t, clockSource, nodes[3], false)

// Node 3 tries to join again
// Use gRPC instead of calling handler directly because of
// authorization check.
Expand All @@ -99,10 +109,23 @@ func TestRaftJoinTwice(t *testing.T) {
raftClient := api.NewRaftMembershipClient(cc)
defer cc.Close()
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
_, err = raftClient.Join(ctx, &api.JoinRequest{})
assert.Error(t, err, "expected error on duplicate Join")
assert.Equal(t, grpc.Code(err), codes.AlreadyExists)
assert.Equal(t, grpc.ErrorDesc(err), "a raft member with this node ID already exists")
_, err = raftClient.Join(ctx, &api.JoinRequest{Addr: l.Addr().String()})
assert.NoError(t, err)

// Propose a value and wait for it to propagate
value, err := raftutils.ProposeValue(t, nodes[1], DefaultProposalTime)
assert.NoError(t, err, "failed to propose value")
raftutils.CheckValue(t, clockSource, nodes[2], value)

// Restart node 2
nodes[2].Server.Stop()
nodes[2].ShutdownRaft()
nodes[2] = raftutils.RestartNode(t, clockSource, nodes[2], false)
raftutils.WaitForCluster(t, clockSource, nodes)

// Node 2 should have the updated address for node 3 in its member list
require.NotNil(t, nodes[2].GetMemberlist()[nodes[3].Config.ID])
require.Equal(t, l.Addr().String(), nodes[2].GetMemberlist()[nodes[3].Config.ID].Addr)
}

func TestRaftLeader(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions manager/state/raft/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ func (l *WrappedListener) Close() error {
return nil
}

// CloseListener closes the listener
func (l *WrappedListener) close() error {
// CloseListener closes the underlying listener
func (l *WrappedListener) CloseListener() error {
return l.Listener.Close()
}

Expand Down Expand Up @@ -471,7 +471,7 @@ func ShutdownNode(node *TestNode) {
<-node.Done()
}
os.RemoveAll(node.StateDir)
node.Listener.close()
node.Listener.CloseListener()
}

// ShutdownRaft shutdowns only raft part of node.
Expand All @@ -487,7 +487,7 @@ func (n *TestNode) ShutdownRaft() {
func CleanupNonRunningNode(node *TestNode) {
node.Server.Stop()
os.RemoveAll(node.StateDir)
node.Listener.close()
node.Listener.CloseListener()
}

// Leader determines who is the leader amongst a set of raft nodes
Expand Down
12 changes: 10 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,14 +828,22 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
}
}

remoteAddr, _ := n.remotes.Select(n.NodeID())
joinAddr := n.config.JoinAddr
if joinAddr == "" {
remoteAddr, err := n.remotes.Select(n.NodeID())
if err == nil {
joinAddr = remoteAddr.Addr
}
}

m, err := manager.New(&manager.Config{
ForceNewCluster: n.config.ForceNewCluster,
RemoteAPI: remoteAPI,
ControlAPI: n.config.ListenControlAPI,
SecurityConfig: securityConfig,
ExternalCAs: n.config.ExternalCAs,
JoinRaft: remoteAddr.Addr,
JoinRaft: joinAddr,
ForceJoin: n.config.JoinAddr != "",
StateDir: n.config.StateDir,
HeartbeatTick: n.config.HeartbeatTick,
ElectionTick: n.config.ElectionTick,
Expand Down