Skip to content

Commit

Permalink
migrate autopilot implementation to raft-autopilot
Browse files Browse the repository at this point in the history
Nomad's original autopilot was importing from a private package in Consul. It
has been moved out to a shared library. Switch Nomad to use this library so that
we can eliminate the import of Consul, which is necessary to build Nomad ENT
with the current version of the Consul SDK. This also will let us pick up
autopilot improvements shared with Consul more easily.
  • Loading branch information
tgross committed Sep 1, 2022
1 parent 99ebd0a commit 0163f19
Show file tree
Hide file tree
Showing 17 changed files with 447 additions and 216 deletions.
3 changes: 3 additions & 0 deletions .changelog/14441.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
autopilot: upgrade to raft-autopilot library
```
11 changes: 5 additions & 6 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ package agent

import (
"context"
"fmt"
"io"
"net"
"net/http"
"strings"

"fmt"
"strconv"
"strings"
"time"

"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/raft"

"github.com/hashicorp/nomad/api"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
)

// OperatorRequest is used route operator/raft API requests to the implementing
Expand Down Expand Up @@ -184,7 +183,7 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re
return nil, nil
}

var reply autopilot.OperatorHealthReply
var reply structs.OperatorHealthReply
if err := s.agent.RPC("Operator.ServerHealth", &args, &reply); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/gosuri/uilive v0.0.4
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/hashicorp/consul v1.7.8
github.com/hashicorp/consul-template v0.29.3-0.20220829190305-21d2c9bb9752
github.com/hashicorp/consul/api v1.14.0
github.com/hashicorp/consul/sdk v0.11.0
Expand Down Expand Up @@ -78,6 +77,7 @@ require (
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/nomad/api v0.0.0-20220829153708-e1e5bb1dcefb
github.com/hashicorp/raft v1.3.9
github.com/hashicorp/raft-autopilot v0.1.6
github.com/hashicorp/raft-boltdb/v2 v2.2.0
github.com/hashicorp/serf v0.9.7
github.com/hashicorp/vault/api v1.7.2
Expand Down
66 changes: 5 additions & 61 deletions go.sum

Large diffs are not rendered by default.

229 changes: 185 additions & 44 deletions nomad/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package nomad
import (
"context"
"fmt"
"strconv"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
autopilot "github.com/hashicorp/raft-autopilot"
"github.com/hashicorp/serf/serf"
)

Expand All @@ -20,88 +22,227 @@ const (
AutopilotVersionTag = "ap_version"
)

// AutopilotDelegate is a Nomad delegate for autopilot operations.
// AutopilotDelegate is a Nomad delegate for autopilot operations. It implements
// the autopilot.ApplicationIntegration interface and methods required for that
// have been documented as such below.
type AutopilotDelegate struct {
server *Server
}

var _ autopilot.ApplicationIntegration = &AutopilotDelegate{}

// AutopilotConfig is used to retrieve the latest configuration from the Nomad
// delegate. This method is required to implement the ApplicationIntegration
// interface.
func (d *AutopilotDelegate) AutopilotConfig() *autopilot.Config {
c := d.server.getOrCreateAutopilotConfig()
if c == nil {
return nil
}

conf := &autopilot.Config{
CleanupDeadServers: c.CleanupDeadServers,
LastContactThreshold: c.LastContactThreshold,
MaxTrailingLogs: c.MaxTrailingLogs,
MinQuorum: c.MinQuorum,
ServerStabilizationTime: c.ServerStabilizationTime,
DisableUpgradeMigration: c.DisableUpgradeMigration,
ModifyIndex: c.ModifyIndex,
CreateIndex: c.CreateIndex,
}

if c.EnableRedundancyZones {
conf.RedundancyZoneTag = AutopilotRZTag
}
if c.EnableCustomUpgrades {
conf.UpgradeVersionTag = AutopilotVersionTag
Ext: autopilotConfigExt(c),
}

return conf
}

func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []serf.Member) map[string]*autopilot.ServerStats {
// FetchServerStats will be called by autopilot to request Nomad fetch the
// server stats out of band. This method is required to implement the
// ApplicationIntegration interface
func (d *AutopilotDelegate) FetchServerStats(ctx context.Context, servers map[raft.ServerID]*autopilot.Server) map[raft.ServerID]*autopilot.ServerStats {
return d.server.statsFetcher.Fetch(ctx, servers)
}

func (d *AutopilotDelegate) IsServer(m serf.Member) (*autopilot.ServerInfo, error) {
ok, parts := isNomadServer(m)
if !ok || parts.Region != d.server.Region() {
return nil, nil
}

server := &autopilot.ServerInfo{
Name: m.Name,
ID: parts.ID,
Addr: parts.Addr,
Build: parts.Build,
Status: m.Status,
}
return server, nil
// KnownServers will be called by autopilot to request the list of servers known
// to Nomad. This method is required to implement the ApplicationIntegration
// interface
func (d *AutopilotDelegate) KnownServers() map[raft.ServerID]*autopilot.Server {
return d.server.autopilotServers()
}

// NotifyHealth heartbeats a metric for monitoring if we're the leader.
func (d *AutopilotDelegate) NotifyHealth(health autopilot.OperatorHealthReply) {
// NotifyState will be called when the autopilot state is updated. The Nomad
// leader heartbeats a metric for monitoring based on this information. This
// method is required to implement the ApplicationIntegration interface
func (d *AutopilotDelegate) NotifyState(state *autopilot.State) {
if d.server.raft.State() == raft.Leader {
metrics.SetGauge([]string{"nomad", "autopilot", "failure_tolerance"}, float32(health.FailureTolerance))
if health.Healthy {
metrics.SetGauge([]string{"nomad", "autopilot", "failure_tolerance"}, float32(state.FailureTolerance))
if state.Healthy {
metrics.SetGauge([]string{"nomad", "autopilot", "healthy"}, 1)
} else {
metrics.SetGauge([]string{"nomad", "autopilot", "healthy"}, 0)
}
}
}

func (d *AutopilotDelegate) PromoteNonVoters(conf *autopilot.Config, health autopilot.OperatorHealthReply) ([]raft.Server, error) {
future := d.server.raft.GetConfiguration()
if err := future.Error(); err != nil {
return nil, fmt.Errorf("failed to get raft configuration: %v", err)
// RemoveFailedServer will be called by autopilot to notify Nomad to remove the
// server in a failed state. This method is required to implement the
// ApplicationIntegration interface. (Note this is expected to return
// immediately so we'll spawn a goroutine for it.)
func (d *AutopilotDelegate) RemoveFailedServer(failedSrv *autopilot.Server) {
go func() {
err := d.server.RemoveFailedNode(failedSrv.Name)
if err != nil {
d.server.logger.Error("could not remove failed server", "server", string(failedSrv.ID))
}
}()
}

// MinRaftProtocol returns the lowest supported Raft protocol among alive
// servers
func (s *Server) MinRaftProtocol() (int, error) {
return minRaftProtocol(s.serf.Members(), isNomadServer)
}

// GetClusterHealth is used to get the current health of the servers, as known
// by the leader.
func (s *Server) GetClusterHealth() *structs.OperatorHealthReply {

state := s.autopilot.GetState()
if state == nil {
// this behavior seems odd but its functionally equivalent to 1.8.5 where if
// autopilot didn't have a health reply yet it would just return no error
return nil
}

health := &structs.OperatorHealthReply{
Healthy: state.Healthy,
FailureTolerance: state.FailureTolerance,
}

for _, srv := range state.Servers {
srvHealth := structs.ServerHealth{
ID: string(srv.Server.ID),
Name: srv.Server.Name,
Address: string(srv.Server.Address),
Version: srv.Server.Version,
Leader: srv.State == autopilot.RaftLeader,
Voter: srv.State == autopilot.RaftLeader || srv.State == autopilot.RaftVoter,
LastContact: srv.Stats.LastContact,
LastTerm: srv.Stats.LastTerm,
LastIndex: srv.Stats.LastIndex,
Healthy: srv.Health.Healthy,
StableSince: srv.Health.StableSince,
}

switch srv.Server.NodeStatus {
case autopilot.NodeAlive:
srvHealth.SerfStatus = serf.StatusAlive
case autopilot.NodeLeft:
srvHealth.SerfStatus = serf.StatusLeft
case autopilot.NodeFailed:
srvHealth.SerfStatus = serf.StatusFailed
default:
srvHealth.SerfStatus = serf.StatusNone
}

health.Servers = append(health.Servers, srvHealth)
}

return health
}

// -------------------
// helper functions

func minRaftProtocol(members []serf.Member, serverFunc func(serf.Member) (bool, *serverParts)) (int, error) {
minVersion := -1
for _, m := range members {
if m.Status != serf.StatusAlive {
continue
}

ok, server := serverFunc(m)
if !ok {
return -1, fmt.Errorf("not a Nomad server")
}
if server == nil {
continue
}

vsn, ok := m.Tags["raft_vsn"]
if !ok {
vsn = "1"
}
raftVsn, err := strconv.Atoi(vsn)
if err != nil {
return -1, err
}

if minVersion == -1 || raftVsn < minVersion {
minVersion = raftVsn
}
}

return autopilot.PromoteStableServers(conf, health, future.Configuration().Servers), nil
if minVersion == -1 {
return minVersion, fmt.Errorf("No servers found")
}

return minVersion, nil
}

func (d *AutopilotDelegate) Raft() *raft.Raft {
return d.server.raft
func (s *Server) autopilotServers() map[raft.ServerID]*autopilot.Server {
servers := make(map[raft.ServerID]*autopilot.Server)

for _, member := range s.serf.Members() {
srv, err := s.autopilotServer(member)
if err != nil {
s.logger.Warn("Error parsing server info", "name", member.Name, "error", err)
continue
} else if srv == nil {
// this member was a client
continue
}

servers[srv.ID] = srv
}

return servers
}

func (d *AutopilotDelegate) SerfLAN() *serf.Serf {
return d.server.serf
func (s *Server) autopilotServer(m serf.Member) (*autopilot.Server, error) {
ok, srv := isNomadServer(m)
if !ok {
return nil, nil
}

return s.autopilotServerFromMetadata(srv)
}

func (d *AutopilotDelegate) SerfWAN() *serf.Serf {
// serf WAN isn't supported in nomad yet
return nil
func (s *Server) autopilotServerFromMetadata(srv *serverParts) (*autopilot.Server, error) {
server := &autopilot.Server{
Name: srv.Name,
ID: raft.ServerID(srv.ID),
Address: raft.ServerAddress(srv.Addr.String()),
Version: srv.Build.String(),
RaftVersion: srv.RaftVersion,
Ext: s.autopilotServerExt(srv),
}

switch srv.Status {
case serf.StatusLeft:
server.NodeStatus = autopilot.NodeLeft
case serf.StatusAlive, serf.StatusLeaving:
// we want to treat leaving as alive to prevent autopilot from
// prematurely removing the node.
server.NodeStatus = autopilot.NodeAlive
case serf.StatusFailed:
server.NodeStatus = autopilot.NodeFailed
default:
server.NodeStatus = autopilot.NodeUnknown
}

members := s.serf.Members()
for _, member := range members {
if member.Name == srv.Name {
server.Meta = member.Tags
break
}
}

return server, nil
}
26 changes: 26 additions & 0 deletions nomad/autopilot_oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//go:build !ent
// +build !ent

package nomad

import (
autopilot "github.com/hashicorp/raft-autopilot"

"github.com/hashicorp/nomad/nomad/structs"
)

func (s *Server) autopilotPromoter() autopilot.Promoter {
return autopilot.DefaultPromoter()
}

// autopilotServerExt returns the autopilot-enterprise.Server extensions needed
// for ENT feature support, but this is the empty OSS implementation.
func (s *Server) autopilotServerExt(_ *serverParts) interface{} {
return nil
}

// autopilotConfigExt returns the autopilot-enterprise.Config extensions needed
// for ENT feature support, but this is the empty OSS implementation.
func autopilotConfigExt(_ *structs.AutopilotConfig) interface{} {
return nil
}
Loading

0 comments on commit 0163f19

Please sign in to comment.