Skip to content

Commit

Permalink
raft: Allow Join to be called when already part of a cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann committed Jul 12, 2017
1 parent 4963ae4 commit 10e1838
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 14 deletions.
7 changes: 6 additions & 1 deletion manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ type Config struct {
// cluster to join.
JoinRaft string

// Top-level state directory
// ForceJoin causes us to invoke raft's Join RPC even if already part
// of a cluster.
ForceJoin bool

// StateDir is the top-level state directory
StateDir string

// ForceNewCluster defines if we have to force a new cluster
Expand Down Expand Up @@ -201,6 +205,7 @@ func New(config *Config) (*Manager, error) {
newNodeOpts := raft.NodeOptions{
ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
JoinAddr: config.JoinRaft,
ForceJoin: config.ForceJoin,
Config: raftCfg,
StateDir: raftStateDir,
ForceNewCluster: config.ForceNewCluster,
Expand Down
33 changes: 22 additions & 11 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ type NodeOptions struct {
// JoinAddr is the cluster to join. May be an empty string to create
// a standalone cluster.
JoinAddr string
// ForceJoin tells us to join even if already part of a cluster.
ForceJoin bool
// Config is the raft config.
Config *raft.Config
// StateDir is the directory to store durable state.
Expand Down Expand Up @@ -393,16 +395,17 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {

// restore from snapshot
if loadAndStartErr == nil {
if n.opts.JoinAddr != "" {
log.G(ctx).Warning("ignoring request to join cluster, because raft state already exists")
if n.opts.JoinAddr != "" && n.opts.ForceJoin {
if err := n.joinCluster(ctx); err != nil {
return errors.Wrap(err, "failed to rejoin cluster")
}
}
n.campaignWhenAble = true
n.initTransport()
n.raftNode = raft.RestartNode(n.Config)
return nil
}

// first member of cluster
if n.opts.JoinAddr == "" {
// First member in the cluster, self-assign ID
n.Config.ID = uint64(rand.Int63()) + 1
Expand All @@ -417,6 +420,22 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
}

// join to existing cluster

if err := n.joinCluster(ctx); err != nil {
return err
}

if _, err := n.newRaftLogs(n.opts.ID); err != nil {
return err
}

n.initTransport()
n.raftNode = raft.StartNode(n.Config, nil)

return nil
}

func (n *Node) joinCluster(ctx context.Context) error {
if n.opts.Addr == "" {
return errors.New("attempted to join raft cluster without knowing own address")
}
Expand All @@ -438,15 +457,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
}

n.Config.ID = resp.RaftID

if _, err := n.newRaftLogs(n.opts.ID); err != nil {
return err
}
n.bootstrapMembers = resp.Members

n.initTransport()
n.raftNode = raft.StartNode(n.Config, nil)

return nil
}

Expand Down
12 changes: 10 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,14 +828,22 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
}
}

remoteAddr, _ := n.remotes.Select(n.NodeID())
joinAddr := n.config.JoinAddr
if joinAddr == "" {
remoteAddr, err := n.remotes.Select(n.NodeID())
if err == nil {
joinAddr = remoteAddr.Addr
}
}

m, err := manager.New(&manager.Config{
ForceNewCluster: n.config.ForceNewCluster,
RemoteAPI: remoteAPI,
ControlAPI: n.config.ListenControlAPI,
SecurityConfig: securityConfig,
ExternalCAs: n.config.ExternalCAs,
JoinRaft: remoteAddr.Addr,
JoinRaft: joinAddr,
ForceJoin: n.config.JoinAddr != "",
StateDir: n.config.StateDir,
HeartbeatTick: n.config.HeartbeatTick,
ElectionTick: n.config.ElectionTick,
Expand Down

0 comments on commit 10e1838

Please sign in to comment.