Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #408 from github/raft-reverse-proxy
Browse files Browse the repository at this point in the history
Raft reverse proxy API calls to leader
  • Loading branch information
Shlomi Noach authored Feb 15, 2018
2 parents 9fa01b6 + 9a42db4 commit 04f0e2a
Show file tree
Hide file tree
Showing 14 changed files with 444 additions and 238 deletions.
6 changes: 4 additions & 2 deletions docs/deployment-raft.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ You may choose between using `MySQL` and `SQLite`. See [backend configuration](c
As suggested, you may want to put `orchestrator` service and `MySQL` service on same box. If using `SQLite` there's nothing else to do.

- Consider adding a proxy on top of the service boxes; the proxy would redirect all traffic to the leader node. There is one and only one leader node, and the status check endpoint is `/api/leader-check`.
- Clients must _only interact with the leader_. Setting up a proxy is one way to ensure that. See [proxy section](raft.md#proxy).
- Nothing should directly interact with a backend DB. Only the leader is capable of coordinating changes to the data with the other `raft` nodes.
- Clients may only interact with healthy raft nodes.
- Simplest is to just interact with the leader. Setting up a proxy is one way to ensure that. See [proxy: leader section](raft.md#proxy-leader).
- Otherwise all healthy raft nodes will reverse proxy your requests to the leader. See [proxy: healthy raft nodes section](raft.md#proxy-healthy-raft-nodes).
- Nothing should directly interact with a backend DB. Only the leader is capable of coordinating changes to the data with the other `raft` nodes.

- `orchestrator` nodes communicate between themselves on `DefaultRaftPort`. This port should be open to all `orchestrator` nodes, and no one else needs to have access to this port.

Expand Down
20 changes: 17 additions & 3 deletions docs/raft.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ Each `orchestrator` node has its own, dedicated backend database server. This wo

`orchestrator` is bundled with `sqlite`, there is no need to install an external dependency.

#### Proxy
#### Proxy: leader

You should only send requests to the leader node.
Only the leader is allowed to make changes.

One way to achieve this is to set up a `HTTP` proxy (e.g HAProxy) on top of the `orchestrator` services.
Simplest setup it to only route traffic to the leader, by setting up a `HTTP` proxy (e.g HAProxy) on top of the `orchestrator` services.

> See [orchestrator-client](#orchestrator-client) section for an alternate approach
Expand Down Expand Up @@ -87,6 +87,20 @@ listen orchestrator
server orchestrator-node-2 orchestrator-node-2.fqdn.com:3000 check
```

#### Proxy: healthy raft nodes

A relaxation of the above constraint.

Healthy raft nodes will reverse proxy your requests to the leader. You may choose (and this happens to be desirable for `kubernetes` setups) to talk to any healthy raft member.

You _must not access unhealthy raft members, i.e. nodes that are isolated from the quorum_.

- Use `/api/raft-health` to identify that a node is part of a healthy raft group.
- A `HTTP 200/OK` response identifies the node as part of the healthy group, and you may direct traffic to the node.
- A `HTTP 500/Internal Server Error` indicates the node is not part of a healthy group.
Note that immediately following startup, and until a leader is elected, you may expect some time where all nodes report as unhealthy.
Note that upon leader re-election you may observe a brief period where all nodes report as unhealthy.

#### orchestrator-client

An alternative to the proxy approach is to use `orchestrator-client`.
Expand Down
409 changes: 218 additions & 191 deletions go/http/api.go

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions go/http/raft_reverse_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package http

import (
"net/http"
"net/http/httputil"
"net/url"

"github.com/openark/golib/log"

"github.com/github/orchestrator/go/raft"
"github.com/go-martini/martini"
)

func raftReverseProxy(w http.ResponseWriter, r *http.Request, c martini.Context) {
if !orcraft.IsRaftEnabled() {
// No raft, so no reverse proxy to the leader
return
}
if orcraft.IsLeader() {
// I am the leader. I will handle the request directly.
return
}
if orcraft.GetLeader() == "" {
return
}

url, err := url.Parse(orcraft.LeaderURI.Get())
if err != nil {
log.Errore(err)
return
}
r.Header.Del("Accept-Encoding")
proxy := httputil.NewSingleHostReverseProxy(url)
proxy.ServeHTTP(w, r)
}
89 changes: 51 additions & 38 deletions go/http/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,46 +384,59 @@ func (this *HttpWeb) Status(params martini.Params, r render.Render, req *http.Re
})
}

func (this *HttpWeb) registerWebRequest(m *martini.ClassicMartini, path string, handler martini.Handler) {
fullPath := fmt.Sprintf("%s/web/%s", this.URLPrefix, path)
if path == "/" {
fullPath = fmt.Sprintf("%s/", this.URLPrefix)
}

if config.Config.RaftEnabled {
m.Get(fullPath, raftReverseProxy, handler)
} else {
m.Get(fullPath, handler)
}
}

// RegisterRequests makes for the de-facto list of known Web calls
func (this *HttpWeb) RegisterRequests(m *martini.ClassicMartini) {
m.Get(this.URLPrefix+"/web/access-token", this.AccessToken)
m.Get(this.URLPrefix+"/", this.Index)
m.Get(this.URLPrefix+"/web", this.Index)
m.Get(this.URLPrefix+"/web/home", this.About)
m.Get(this.URLPrefix+"/web/about", this.About)
m.Get(this.URLPrefix+"/web/keep-calm", this.KeepCalm)
m.Get(this.URLPrefix+"/web/faq", this.FAQ)
m.Get(this.URLPrefix+"/web/status", this.Status)
m.Get(this.URLPrefix+"/web/clusters", this.Clusters)
m.Get(this.URLPrefix+"/web/clusters-analysis", this.ClustersAnalysis)
m.Get(this.URLPrefix+"/web/cluster/:clusterName", this.Cluster)
m.Get(this.URLPrefix+"/web/cluster/alias/:clusterAlias", this.ClusterByAlias)
m.Get(this.URLPrefix+"/web/cluster/instance/:host/:port", this.ClusterByInstance)
m.Get(this.URLPrefix+"/web/cluster-pools/:clusterName", this.ClusterPools)
m.Get(this.URLPrefix+"/web/search/:searchString", this.Search)
m.Get(this.URLPrefix+"/web/search", this.Search)
m.Get(this.URLPrefix+"/web/discover", this.Discover)
m.Get(this.URLPrefix+"/web/long-queries", this.LongQueries)
m.Get(this.URLPrefix+"/web/audit", this.Audit)
m.Get(this.URLPrefix+"/web/audit/:page", this.Audit)
m.Get(this.URLPrefix+"/web/audit/instance/:host/:port", this.Audit)
m.Get(this.URLPrefix+"/web/audit/instance/:host/:port/:page", this.Audit)
m.Get(this.URLPrefix+"/web/audit-recovery", this.AuditRecovery)
m.Get(this.URLPrefix+"/web/audit-recovery/:page", this.AuditRecovery)
m.Get(this.URLPrefix+"/web/audit-recovery/id/:id", this.AuditRecovery)
m.Get(this.URLPrefix+"/web/audit-recovery/uid/:uid", this.AuditRecovery)
m.Get(this.URLPrefix+"/web/audit-recovery/cluster/:clusterName", this.AuditRecovery)
m.Get(this.URLPrefix+"/web/audit-recovery/cluster/:clusterName/:page", this.AuditRecovery)
m.Get(this.URLPrefix+"/web/audit-recovery/alias/:clusterAlias", this.AuditRecovery)
m.Get(this.URLPrefix+"/web/audit-failure-detection", this.AuditFailureDetection)
m.Get(this.URLPrefix+"/web/audit-failure-detection/:page", this.AuditFailureDetection)
m.Get(this.URLPrefix+"/web/audit-failure-detection/id/:id", this.AuditFailureDetection)
m.Get(this.URLPrefix+"/web/audit-failure-detection/alias/:clusterAlias", this.AuditFailureDetection)
m.Get(this.URLPrefix+"/web/audit-recovery-steps/:uid", this.AuditRecovery)
m.Get(this.URLPrefix+"/web/agents", this.Agents)
m.Get(this.URLPrefix+"/web/agent/:host", this.Agent)
m.Get(this.URLPrefix+"/web/seed-details/:seedId", this.AgentSeedDetails)
m.Get(this.URLPrefix+"/web/seeds", this.Seeds)
this.registerWebRequest(m, "access-token", this.AccessToken)
this.registerWebRequest(m, "", this.Index)
this.registerWebRequest(m, "/", this.Index)
this.registerWebRequest(m, "home", this.About)
this.registerWebRequest(m, "about", this.About)
this.registerWebRequest(m, "keep-calm", this.KeepCalm)
this.registerWebRequest(m, "faq", this.FAQ)
this.registerWebRequest(m, "status", this.Status)
this.registerWebRequest(m, "clusters", this.Clusters)
this.registerWebRequest(m, "clusters-analysis", this.ClustersAnalysis)
this.registerWebRequest(m, "cluster/:clusterName", this.Cluster)
this.registerWebRequest(m, "cluster/alias/:clusterAlias", this.ClusterByAlias)
this.registerWebRequest(m, "cluster/instance/:host/:port", this.ClusterByInstance)
this.registerWebRequest(m, "cluster-pools/:clusterName", this.ClusterPools)
this.registerWebRequest(m, "search/:searchString", this.Search)
this.registerWebRequest(m, "search", this.Search)
this.registerWebRequest(m, "discover", this.Discover)
this.registerWebRequest(m, "long-queries", this.LongQueries)
this.registerWebRequest(m, "audit", this.Audit)
this.registerWebRequest(m, "audit/:page", this.Audit)
this.registerWebRequest(m, "audit/instance/:host/:port", this.Audit)
this.registerWebRequest(m, "audit/instance/:host/:port/:page", this.Audit)
this.registerWebRequest(m, "audit-recovery", this.AuditRecovery)
this.registerWebRequest(m, "audit-recovery/:page", this.AuditRecovery)
this.registerWebRequest(m, "audit-recovery/id/:id", this.AuditRecovery)
this.registerWebRequest(m, "audit-recovery/uid/:uid", this.AuditRecovery)
this.registerWebRequest(m, "audit-recovery/cluster/:clusterName", this.AuditRecovery)
this.registerWebRequest(m, "audit-recovery/cluster/:clusterName/:page", this.AuditRecovery)
this.registerWebRequest(m, "audit-recovery/alias/:clusterAlias", this.AuditRecovery)
this.registerWebRequest(m, "audit-failure-detection", this.AuditFailureDetection)
this.registerWebRequest(m, "audit-failure-detection/:page", this.AuditFailureDetection)
this.registerWebRequest(m, "audit-failure-detection/id/:id", this.AuditFailureDetection)
this.registerWebRequest(m, "audit-failure-detection/alias/:clusterAlias", this.AuditFailureDetection)
this.registerWebRequest(m, "audit-recovery-steps/:uid", this.AuditRecovery)
this.registerWebRequest(m, "agents", this.Agents)
this.registerWebRequest(m, "agent/:host", this.Agent)
this.registerWebRequest(m, "seed-details/:seedId", this.AgentSeedDetails)
this.registerWebRequest(m, "seeds", this.Seeds)

this.RegisterDebug(m)
}
Expand Down
11 changes: 11 additions & 0 deletions go/logic/command_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func (applier *CommandApplier) ApplyCommand(op string, value []byte) interface{}
return applier.enableGlobalRecoveries(value)
case "put-key-value":
return applier.putKeyValue(value)
case "leader-uri":
return applier.leaderURI(value)
}
return log.Errorf("Unknown command op: %s", op)
}
Expand Down Expand Up @@ -244,3 +246,12 @@ func (applier *CommandApplier) putKeyValue(value []byte) interface{} {
err := kv.PutKVPair(&kvPair)
return err
}

func (applier *CommandApplier) leaderURI(value []byte) interface{} {
var uri string
if err := json.Unmarshal(value, &uri); err != nil {
return log.Errore(err)
}
orcraft.LeaderURI.Set(uri)
return nil
}
14 changes: 14 additions & 0 deletions go/logic/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ var discoveryQueueLengthGauge = metrics.NewGauge()
var discoveryRecentCountGauge = metrics.NewGauge()
var isElectedGauge = metrics.NewGauge()
var isHealthyGauge = metrics.NewGauge()
var isRaftHealthyGauge = metrics.NewGauge()
var isRaftLeaderGauge = metrics.NewGauge()
var discoveryMetrics = collection.CreateOrReturnCollection(discoveryMetricsName)

var isElectedNode int64 = 0
Expand All @@ -78,6 +80,8 @@ func init() {
metrics.Register("discoveries.recent_count", discoveryRecentCountGauge)
metrics.Register("elect.is_elected", isElectedGauge)
metrics.Register("health.is_healthy", isHealthyGauge)
metrics.Register("raft.is_healthy", isRaftHealthyGauge)
metrics.Register("raft.is_leader", isRaftLeaderGauge)

ometrics.OnMetricsTick(func() {
discoveryQueueLengthGauge.Update(int64(discoveryQueue.QueueLen()))
Expand All @@ -94,6 +98,16 @@ func init() {
ometrics.OnMetricsTick(func() {
isHealthyGauge.Update(atomic.LoadInt64(&process.LastContinousCheckHealthy))
})
ometrics.OnMetricsTick(func() {
var healthy int64
if orcraft.IsHealthy() {
healthy = 1
}
isRaftHealthyGauge.Update(healthy)
})
ometrics.OnMetricsTick(func() {
isRaftLeaderGauge.Update(atomic.LoadInt64(&isElectedNode))
})
}

func IsLeader() bool {
Expand Down
5 changes: 5 additions & 0 deletions go/logic/snapshot_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/github/orchestrator/go/db"
"github.com/github/orchestrator/go/inst"
"github.com/github/orchestrator/go/raft"

"github.com/openark/golib/log"
"github.com/openark/golib/sqlutils"
Expand All @@ -49,6 +50,8 @@ type SnapshotData struct {
KVStore,
Recovery,
RecoverySteps sqlutils.NamedResultData

LeaderURI string
}

func NewSnapshotData() *SnapshotData {
Expand Down Expand Up @@ -76,6 +79,7 @@ func writeTableData(tableName string, data *sqlutils.NamedResultData) error {
func CreateSnapshotData() *SnapshotData {
snapshotData := NewSnapshotData()

snapshotData.LeaderURI = orcraft.LeaderURI.Get()
// keys
snapshotData.Keys, _ = inst.ReadAllInstanceKeys()
snapshotData.MinimalInstances, _ = inst.ReadAllMinimalInstances()
Expand Down Expand Up @@ -136,6 +140,7 @@ func (this *SnapshotDataCreatorApplier) Restore(rc io.ReadCloser) error {
return err
}

orcraft.LeaderURI.Set(snapshotData.LeaderURI)
// keys
{
snapshotInstanceKeyMap := inst.NewInstanceKeyMap()
Expand Down
1 change: 0 additions & 1 deletion go/process/election_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func GrabElection() error {
// Reelect clears the way for re-elections. Active node is immediately demoted.
func Reelect() error {
if orcraft.IsRaftEnabled() {
// return log.Errorf("Cannot Reelect on raft setup")
orcraft.StepDown()
}
_, err := db.ExecOrchestrator(`delete from active_node where anchor = 1`)
Expand Down
6 changes: 6 additions & 0 deletions go/process/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type HealthStatus struct {
ActiveNode NodeHealth
Error error
AvailableNodes [](*NodeHealth)
RaftLeader string
IsRaftLeader bool
RaftLeaderURI string
}

type OrchestratorExecutionMode string
Expand Down Expand Up @@ -118,6 +121,9 @@ func HealthTest() (health *HealthStatus, err error) {
if orcraft.IsRaftEnabled() {
health.ActiveNode.Hostname = orcraft.GetLeader()
health.IsActiveNode = orcraft.IsLeader()
health.RaftLeader = orcraft.GetLeader()
health.RaftLeaderURI = orcraft.LeaderURI.Get()
health.IsRaftLeader = orcraft.IsLeader()
} else {
if health.ActiveNode, health.IsActiveNode, err = ElectedNode(); err != nil {
health.Error = err
Expand Down
56 changes: 56 additions & 0 deletions go/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/rand"
"net"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -50,6 +51,25 @@ var ThisHostname string

var fatalRaftErrorChan = make(chan error)

type leaderURI struct {
uri string
sync.Mutex
}

var LeaderURI leaderURI

func (luri *leaderURI) Get() string {
luri.Lock()
defer luri.Unlock()
return luri.uri
}

func (luri *leaderURI) Set(uri string) {
luri.Lock()
defer luri.Unlock()
luri.uri = uri
}

func IsRaftEnabled() bool {
return store != nil
}
Expand All @@ -61,6 +81,21 @@ func FatalRaftError(err error) error {
return err
}

func computeLeaderURI() (uri string, err error) {
protocol := "http"
if config.Config.UseSSL {
protocol = "https"
}
hostname := config.Config.RaftAdvertise
listenTokens := strings.Split(config.Config.ListenAddress, ":")
if len(listenTokens) < 2 {
return uri, fmt.Errorf("computeLeaderURI: cannot determine listen port out of config.Config.ListenAddress: %+v", config.Config.ListenAddress)
}
port := listenTokens[1]
uri = fmt.Sprintf("%s://%s:%s", protocol, hostname, port)
return uri, nil
}

// Setup creates the entire raft shananga. Creates the store, associates with the throttler,
// contacts peer nodes, and subscribes to leader changes to export them.
func Setup(applier CommandApplier, snapshotCreatorApplier SnapshotCreatorApplier, thisHostname string) error {
Expand All @@ -87,6 +122,18 @@ func Setup(applier CommandApplier, snapshotCreatorApplier SnapshotCreatorApplier
return log.Errorf("failed to open raft store: %s", err.Error())
}

if leaderURI, err := computeLeaderURI(); err != nil {
return FatalRaftError(err)
} else {
leaderCh := store.raft.LeaderCh()
go func() {
for isTurnedLeader := range leaderCh {
if isTurnedLeader {
PublishCommand("leader-uri", leaderURI)
}
}
}()
}
setupHttpClient()

atomic.StoreInt64(&raftSetupComplete, 1)
Expand Down Expand Up @@ -175,6 +222,15 @@ func GetState() raft.RaftState {
return getRaft().State()
}

// IsHealthy checks whether this node is healthy in the raft group
func IsHealthy() bool {
if !isRaftSetupComplete() {
return false
}
state := GetState()
return state == raft.Leader || state == raft.Follower
}

func Snapshot() error {
future := getRaft().Snapshot()
return future.Error()
Expand Down
1 change: 0 additions & 1 deletion go/raft/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func (store *Store) Open(peerNodes []string) error {
if store.raft, err = raft.NewRaft(config, (*fsm)(store), logStore, logStore, snapshots, peerStore, transport); err != nil {
return fmt.Errorf("error creating new raft: %s", err)
}
store.raft.Yield()
store.peerStore = peerStore
log.Infof("new raft created")

Expand Down
Loading

0 comments on commit 04f0e2a

Please sign in to comment.