Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

support for api/ raft-add-peer and raft-remove-peer #1208

Merged
merged 3 commits into from
Jul 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2847,6 +2847,46 @@ func (this *HttpAPI) Reelect(params martini.Params, r render.Render, req *http.R
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Set re-elections")})
}

// RaftAddPeer adds a new node to the raft cluster
func (this *HttpAPI) RaftAddPeer(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
}
if !orcraft.IsRaftEnabled() {
Respond(r, &APIResponse{Code: ERROR, Message: "raft-add-peer: not running with raft setup"})
return
}
addr, err := orcraft.AddPeer(params["addr"])

if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("Cannot add raft peer: %+v", err)})
return
}

r.JSON(http.StatusOK, addr)
}

// RaftAddPeer removes a node fro the raft cluster
func (this *HttpAPI) RaftRemovePeer(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
}
if !orcraft.IsRaftEnabled() {
Respond(r, &APIResponse{Code: ERROR, Message: "raft-remove-peer: not running with raft setup"})
return
}
addr, err := orcraft.RemovePeer(params["addr"])

if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("Cannot remove raft peer: %+v", err)})
return
}

r.JSON(http.StatusOK, addr)
}

// RaftYield yields to a specified host
func (this *HttpAPI) RaftYield(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Expand Down Expand Up @@ -2927,6 +2967,40 @@ func (this *HttpAPI) RaftHealth(params martini.Params, r render.Render, req *htt
r.JSON(http.StatusOK, "healthy")
}

// RaftStatus exports a status summary for a raft node
func (this *HttpAPI) RaftStatus(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !orcraft.IsRaftEnabled() {
Respond(r, &APIResponse{Code: ERROR, Message: "raft-state: not running with raft setup"})
return
}
peers, err := orcraft.GetPeers()
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("Cannot get raft peers: %+v", err)})
return
}

status := struct {
RaftBind string
RaftAdvertise string
State string
Healthy bool
IsPartOfQuorum bool
Leader string
LeaderURI string
Peers []string
}{
RaftBind: orcraft.GetRaftBind(),
RaftAdvertise: orcraft.GetRaftAdvertise(),
State: orcraft.GetState().String(),
Healthy: orcraft.IsHealthy(),
IsPartOfQuorum: orcraft.IsPartOfQuorum(),
Leader: orcraft.GetLeader(),
LeaderURI: orcraft.LeaderURI.Get(),
Peers: peers,
}
r.JSON(http.StatusOK, status)
}

// RaftFollowerHealthReport is initiated by followers to report their identity and health to the raft leader.
func (this *HttpAPI) RaftFollowerHealthReport(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !orcraft.IsRaftEnabled() {
Expand Down Expand Up @@ -3802,12 +3876,15 @@ func (this *HttpAPI) RegisterRequests(m *martini.ClassicMartini) {
this.registerAPIRequestNoProxy(m, "leader-check", this.LeaderCheck)
this.registerAPIRequestNoProxy(m, "leader-check/:errorStatusCode", this.LeaderCheck)
this.registerAPIRequestNoProxy(m, "grab-election", this.GrabElection)
this.registerAPIRequest(m, "raft-add-peer/:addr", this.RaftAddPeer) // delegated to the raft leader
this.registerAPIRequest(m, "raft-remove-peer/:addr", this.RaftRemovePeer) // delegated to the raft leader
this.registerAPIRequestNoProxy(m, "raft-yield/:node", this.RaftYield)
this.registerAPIRequestNoProxy(m, "raft-yield-hint/:hint", this.RaftYieldHint)
this.registerAPIRequestNoProxy(m, "raft-peers", this.RaftPeers)
this.registerAPIRequestNoProxy(m, "raft-state", this.RaftState)
this.registerAPIRequestNoProxy(m, "raft-leader", this.RaftLeader)
this.registerAPIRequestNoProxy(m, "raft-health", this.RaftHealth)
this.registerAPIRequestNoProxy(m, "raft-status", this.RaftStatus)
this.registerAPIRequestNoProxy(m, "raft-snapshot", this.RaftSnapshot)
this.registerAPIRequestNoProxy(m, "raft-follower-health-report/:authenticationToken/:raftBind/:raftAdvertise", this.RaftFollowerHealthReport)
this.registerAPIRequestNoProxy(m, "reload-configuration", this.ReloadConfiguration)
Expand Down
31 changes: 30 additions & 1 deletion go/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
raftTimeout = 10 * time.Second
)

var RaftNotRunning = fmt.Errorf("raft is not configured/running")
var RaftNotRunning error = fmt.Errorf("raft is not configured/running")
var store *Store
var raftSetupComplete int64
var ThisHostname string
Expand Down Expand Up @@ -215,6 +215,9 @@ func normalizeRaftNode(node string) (string, error) {
// data and opinion are trustworthy.
// Comapre that to a node which has left (or has not yet joined) the quorum: it has stale data.
func IsPartOfQuorum() bool {
if GetLeader() == "" {
return false
}
state := GetState()
return state == raft.Leader || state == raft.Follower
}
Expand Down Expand Up @@ -281,6 +284,14 @@ func Yield() error {
return getRaft().Yield()
}

func GetRaftBind() string {
return store.raftBind
}

func GetRaftAdvertise() string {
return store.raftAdvertise
}

func GetPeers() ([]string, error) {
if !IsRaftEnabled() {
return []string{}, RaftNotRunning
Expand All @@ -307,6 +318,24 @@ func PublishCommand(op string, value interface{}) (response interface{}, err err
return store.genericCommand(op, b)
}

func AddPeer(addr string) (response interface{}, err error) {
addr, err = normalizeRaftNode(addr)
if err != nil {
return "", err
}
err = store.AddPeer(addr)
return addr, err
}

func RemovePeer(addr string) (response interface{}, err error) {
addr, err = normalizeRaftNode(addr)
if err != nil {
return "", err
}
err = store.RemovePeer(addr)
return addr, err
}

func PublishYield(toPeer string) (response interface{}, err error) {
toPeer, err = normalizeRaftNode(toPeer)
if err != nil {
Expand Down
19 changes: 16 additions & 3 deletions go/raft/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (store *Store) Open(peerNodes []string) error {
config := raft.DefaultConfig()
config.SnapshotThreshold = 1
config.SnapshotInterval = snapshotInterval
config.ShutdownOnRemove = false

// Setup Raft communication.
advertise, err := net.ResolveTCPAddr("tcp", store.raftAdvertise)
Expand Down Expand Up @@ -116,10 +117,10 @@ func (store *Store) Open(peerNodes []string) error {
return nil
}

// Join joins a node, located at addr, to this store. The node must be ready to
// AddPeer adds a node, located at addr, to this store. The node must be ready to
// respond to Raft communications at that address.
func (store *Store) Join(addr string) error {
log.Infof("received join request for remote node as %s", addr)
func (store *Store) AddPeer(addr string) error {
log.Infof("received join request for remote node %s", addr)

f := store.raft.AddPeer(addr)
if f.Error() != nil {
Expand All @@ -129,6 +130,18 @@ func (store *Store) Join(addr string) error {
return nil
}

// RemovePeer removes a node from this raft setup
func (store *Store) RemovePeer(addr string) error {
log.Infof("received remove request for remote node %s", addr)

f := store.raft.RemovePeer(addr)
if f.Error() != nil {
return f.Error()
}
log.Infof("node at %s removed successfully", addr)
return nil
}

// genericCommand requests consensus for applying a single command.
// This is an internal orchestrator implementation
func (store *Store) genericCommand(op string, bytes []byte) (response interface{}, err error) {
Expand Down