diff --git a/go/http/api.go b/go/http/api.go index 93085ccc4..cc677e1a8 100644 --- a/go/http/api.go +++ b/go/http/api.go @@ -2847,6 +2847,46 @@ func (this *HttpAPI) Reelect(params martini.Params, r render.Render, req *http.R Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Set re-elections")}) } +// RaftAddPeer adds a new node to the raft cluster +func (this *HttpAPI) RaftAddPeer(params martini.Params, r render.Render, req *http.Request, user auth.User) { + if !isAuthorizedForAction(req, user) { + Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"}) + return + } + if !orcraft.IsRaftEnabled() { + Respond(r, &APIResponse{Code: ERROR, Message: "raft-add-peer: not running with raft setup"}) + return + } + addr, err := orcraft.AddPeer(params["addr"]) + + if err != nil { + Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("Cannot add raft peer: %+v", err)}) + return + } + + r.JSON(http.StatusOK, addr) +} + +// RaftAddPeer removes a node fro the raft cluster +func (this *HttpAPI) RaftRemovePeer(params martini.Params, r render.Render, req *http.Request, user auth.User) { + if !isAuthorizedForAction(req, user) { + Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"}) + return + } + if !orcraft.IsRaftEnabled() { + Respond(r, &APIResponse{Code: ERROR, Message: "raft-remove-peer: not running with raft setup"}) + return + } + addr, err := orcraft.RemovePeer(params["addr"]) + + if err != nil { + Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("Cannot remove raft peer: %+v", err)}) + return + } + + r.JSON(http.StatusOK, addr) +} + // RaftYield yields to a specified host func (this *HttpAPI) RaftYield(params martini.Params, r render.Render, req *http.Request, user auth.User) { if !isAuthorizedForAction(req, user) { @@ -2927,6 +2967,40 @@ func (this *HttpAPI) RaftHealth(params martini.Params, r render.Render, req *htt r.JSON(http.StatusOK, "healthy") } +// RaftStatus exports a status summary for a raft node +func (this *HttpAPI) RaftStatus(params martini.Params, r render.Render, req *http.Request, user auth.User) { + if !orcraft.IsRaftEnabled() { + Respond(r, &APIResponse{Code: ERROR, Message: "raft-state: not running with raft setup"}) + return + } + peers, err := orcraft.GetPeers() + if err != nil { + Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("Cannot get raft peers: %+v", err)}) + return + } + + status := struct { + RaftBind string + RaftAdvertise string + State string + Healthy bool + IsPartOfQuorum bool + Leader string + LeaderURI string + Peers []string + }{ + RaftBind: orcraft.GetRaftBind(), + RaftAdvertise: orcraft.GetRaftAdvertise(), + State: orcraft.GetState().String(), + Healthy: orcraft.IsHealthy(), + IsPartOfQuorum: orcraft.IsPartOfQuorum(), + Leader: orcraft.GetLeader(), + LeaderURI: orcraft.LeaderURI.Get(), + Peers: peers, + } + r.JSON(http.StatusOK, status) +} + // RaftFollowerHealthReport is initiated by followers to report their identity and health to the raft leader. func (this *HttpAPI) RaftFollowerHealthReport(params martini.Params, r render.Render, req *http.Request, user auth.User) { if !orcraft.IsRaftEnabled() { @@ -3802,12 +3876,15 @@ func (this *HttpAPI) RegisterRequests(m *martini.ClassicMartini) { this.registerAPIRequestNoProxy(m, "leader-check", this.LeaderCheck) this.registerAPIRequestNoProxy(m, "leader-check/:errorStatusCode", this.LeaderCheck) this.registerAPIRequestNoProxy(m, "grab-election", this.GrabElection) + this.registerAPIRequest(m, "raft-add-peer/:addr", this.RaftAddPeer) // delegated to the raft leader + this.registerAPIRequest(m, "raft-remove-peer/:addr", this.RaftRemovePeer) // delegated to the raft leader this.registerAPIRequestNoProxy(m, "raft-yield/:node", this.RaftYield) this.registerAPIRequestNoProxy(m, "raft-yield-hint/:hint", this.RaftYieldHint) this.registerAPIRequestNoProxy(m, "raft-peers", this.RaftPeers) this.registerAPIRequestNoProxy(m, "raft-state", this.RaftState) this.registerAPIRequestNoProxy(m, "raft-leader", this.RaftLeader) this.registerAPIRequestNoProxy(m, "raft-health", this.RaftHealth) + this.registerAPIRequestNoProxy(m, "raft-status", this.RaftStatus) this.registerAPIRequestNoProxy(m, "raft-snapshot", this.RaftSnapshot) this.registerAPIRequestNoProxy(m, "raft-follower-health-report/:authenticationToken/:raftBind/:raftAdvertise", this.RaftFollowerHealthReport) this.registerAPIRequestNoProxy(m, "reload-configuration", this.ReloadConfiguration) diff --git a/go/raft/raft.go b/go/raft/raft.go index ac6aa8fb3..dcb8cca2d 100644 --- a/go/raft/raft.go +++ b/go/raft/raft.go @@ -46,7 +46,7 @@ const ( raftTimeout = 10 * time.Second ) -var RaftNotRunning = fmt.Errorf("raft is not configured/running") +var RaftNotRunning error = fmt.Errorf("raft is not configured/running") var store *Store var raftSetupComplete int64 var ThisHostname string @@ -215,6 +215,9 @@ func normalizeRaftNode(node string) (string, error) { // data and opinion are trustworthy. // Comapre that to a node which has left (or has not yet joined) the quorum: it has stale data. func IsPartOfQuorum() bool { + if GetLeader() == "" { + return false + } state := GetState() return state == raft.Leader || state == raft.Follower } @@ -281,6 +284,14 @@ func Yield() error { return getRaft().Yield() } +func GetRaftBind() string { + return store.raftBind +} + +func GetRaftAdvertise() string { + return store.raftAdvertise +} + func GetPeers() ([]string, error) { if !IsRaftEnabled() { return []string{}, RaftNotRunning @@ -307,6 +318,24 @@ func PublishCommand(op string, value interface{}) (response interface{}, err err return store.genericCommand(op, b) } +func AddPeer(addr string) (response interface{}, err error) { + addr, err = normalizeRaftNode(addr) + if err != nil { + return "", err + } + err = store.AddPeer(addr) + return addr, err +} + +func RemovePeer(addr string) (response interface{}, err error) { + addr, err = normalizeRaftNode(addr) + if err != nil { + return "", err + } + err = store.RemovePeer(addr) + return addr, err +} + func PublishYield(toPeer string) (response interface{}, err error) { toPeer, err = normalizeRaftNode(toPeer) if err != nil { diff --git a/go/raft/store.go b/go/raft/store.go index fbe76b857..fb0257771 100644 --- a/go/raft/store.go +++ b/go/raft/store.go @@ -48,6 +48,7 @@ func (store *Store) Open(peerNodes []string) error { config := raft.DefaultConfig() config.SnapshotThreshold = 1 config.SnapshotInterval = snapshotInterval + config.ShutdownOnRemove = false // Setup Raft communication. advertise, err := net.ResolveTCPAddr("tcp", store.raftAdvertise) @@ -116,10 +117,10 @@ func (store *Store) Open(peerNodes []string) error { return nil } -// Join joins a node, located at addr, to this store. The node must be ready to +// AddPeer adds a node, located at addr, to this store. The node must be ready to // respond to Raft communications at that address. -func (store *Store) Join(addr string) error { - log.Infof("received join request for remote node as %s", addr) +func (store *Store) AddPeer(addr string) error { + log.Infof("received join request for remote node %s", addr) f := store.raft.AddPeer(addr) if f.Error() != nil { @@ -129,6 +130,18 @@ func (store *Store) Join(addr string) error { return nil } +// RemovePeer removes a node from this raft setup +func (store *Store) RemovePeer(addr string) error { + log.Infof("received remove request for remote node %s", addr) + + f := store.raft.RemovePeer(addr) + if f.Error() != nil { + return f.Error() + } + log.Infof("node at %s removed successfully", addr) + return nil +} + // genericCommand requests consensus for applying a single command. // This is an internal orchestrator implementation func (store *Store) genericCommand(op string, bytes []byte) (response interface{}, err error) {