Skip to content

Commit

Permalink
Rework server_auto_join to use a timer instead of the peer count.
Browse files Browse the repository at this point in the history
It is perfectly viable for an admin to downsize a Nomad Server cluster
down to 1, 2, or `num % 2 == 0` (however ill-advised such activities
may be).  And instead of using `bootstrap_expect`, use a timeout-based
strategy.  If the `bootstrapFn` hasn't observed a leader in 15s it will
fall back to Consul and will poll every ~60s until it sees a leader.
  • Loading branch information
sean- committed Jun 16, 2016
1 parent 4d14988 commit a4cfb2b
Showing 1 changed file with 95 additions and 24 deletions.
119 changes: 95 additions & 24 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,21 @@ import (
)

const (
// datacenterQueryFactor sets the max number of DCs that a Nomad
// Server will query to find bootstrap_expect servers. If
// bootstrap_expect is 5, then the Nomad Server bootstrapFn handler
// will search through up to 15 Consul DCs to find possible Serf
// peers.
datacenterQueryFactor = 5
// datacenterQueryLimit sets the max number of DCs that a Nomad
// Server will query to find bootstrap_expect servers.
datacenterQueryLimit = 25

// maxStaleLeadership is the maximum time we will permit this Nomad
// Server to go without seeing a valid Raft leader.
maxStaleLeadership = 15 * time.Second

// peersPollInterval is used as the polling interval between attempts
// to query Consul for Nomad Servers.
peersPollInterval = 45 * time.Second

// peersPollJitter is used to provide a slight amount of variance to
// the retry interval when querying Consul Servers
peersPollJitterFactor = 2

raftState = "raft/"
serfSnapshot = "serf/snapshot"
Expand Down Expand Up @@ -377,9 +386,31 @@ func (s *Server) Leave() error {
return nil
}

// setupConsulSyncer creates Server-mode consul.Syncer which periodically
// executes callbacks on a fixed interval.
func (s *Server) setupConsulSyncer() error {
// setupBootstrapHandler() creates the closure necessary to support a Consul
// fallback handler.
func (s *Server) setupBootstrapHandler() error {
// peersTimeout is used to indicate to the Consul Syncer that the
// current Nomad Server has a stale peer set. peersTimeout will time
// out if the Consul Syncer bootstrapFn has not observed a Raft
// leader in maxStaleLeadership. If peersTimeout has been triggered,
// the Consul Syncer will begin querying Consul for other Nomad
// Servers.
//
// NOTE: time.Timer is used vs time.Time in order to handle clock
// drift because time.Timer is implemented as a monotonic clock.
var peersTimeout *time.Timer = time.NewTimer(0)

// leadershipTimedOut is a helper method that returns true if the
// peersTimeout timer has expired.
leadershipTimedOut := func() bool {
select {
case <-peersTimeout.C:
return true
default:
return false
}
}

// The bootstrapFn callback handler is used to periodically poll
// Consul to look up the Nomad Servers in Consul. In the event the
// server has been brought up without a `retry-join` configuration
Expand All @@ -390,22 +421,46 @@ func (s *Server) setupConsulSyncer() error {
bootstrapFn := func() error {
// If there is a raft leader, do nothing
if s.raft.Leader() != "" {
peersTimeout.Reset(maxStaleLeadership)
return nil
}

// If the the number of Raft peers is more than the min
// quorum, do nothing.
raftPeers, err := s.raftPeers.Peers()
minQuorum := (s.config.BootstrapExpect / 2) + 1
if err == nil && len(raftPeers) >= minQuorum {
return nil
// (ab)use serf.go's behavior of setting BootstrapExpect to
// zero if we have bootstrapped. If we have bootstrapped
bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect)
if bootstrapExpect == 0 {
// This Nomad Server has been bootstrapped. Rely on
// timeouts to determine health.

if !leadershipTimedOut() {
return nil
}
} else {
// This Nomad Server has not been bootstrapped, reach
// out to Consul if our peer list is less than
// `bootstrap_expect`.
raftPeers, err := s.raftPeers.Peers()
if err != nil {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
return nil
}

// The necessary number of Nomad Servers required for
// quorum has been reached, we do not need to poll
// Consul. Let the normal timeout-based strategy
// take over.
if len(raftPeers) >= int(bootstrapExpect) {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
return nil
}
}

s.logger.Printf("[TRACE] server.consul: lost contact with Nomad quorum, falling back to Consul for server list")
s.logger.Printf("[DEBUG] server.consul: lost contact with Nomad quorum, falling back to Consul for server list")

consulCatalog := s.consulSyncer.ConsulClient().Catalog()
dcs, err := consulCatalog.Datacenters()
if err != nil {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
return fmt.Errorf("server.consul: unable to query Consul datacenters: %v", err)
}
if len(dcs) > 2 {
Expand All @@ -417,7 +472,7 @@ func (s *Server) setupConsulSyncer() error {
nearestDC := dcs[0]
otherDCs := make([]string, 0, len(dcs))
shuffleStrings(otherDCs)
otherDCs = dcs[1:lib.MinInt(len(dcs), s.config.BootstrapExpect*datacenterQueryFactor)]
otherDCs = dcs[1:lib.MinInt(len(dcs), datacenterQueryLimit)]

dcs = append([]string{nearestDC}, otherDCs...)
}
Expand All @@ -427,16 +482,16 @@ func (s *Server) setupConsulSyncer() error {
const defaultMaxNumNomadServers = 8
nomadServerServices := make([]string, 0, defaultMaxNumNomadServers)
for _, dc := range dcs {
opts := &consulapi.QueryOptions{
consulOpts := &consulapi.QueryOptions{
AllowStale: true,
Datacenter: dc,
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
}
consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, opts)
consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, consulOpts)
if err != nil {
s.logger.Printf("[TRACE] server.consul: failed to query dc %+q's service %+q: %v", dc, nomadServerServiceName, err)
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", nomadServerServiceName, dc, err))
s.logger.Printf("[WARN] server.consul: failed to query service %+q in Consul datacenter %+q: %v", nomadServerServiceName, dc, err)
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %q from Consul datacenter %q: %v", nomadServerServiceName, dc, err))
continue
}

Expand All @@ -453,28 +508,44 @@ func (s *Server) setupConsulSyncer() error {

if len(nomadServerServices) == 0 {
if len(mErr.Errors) > 0 {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
return mErr.ErrorOrNil()
}

// Log the error and return nil so future handlers
// can attempt to register the `nomad` service.
s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters: %+q", nomadServerServiceName, dcs)
s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters %+q", nomadServerServiceName, dcs)
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
return nil
}

numServersContacted, err := s.Join(nomadServerServices)
if err != nil {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err)
}

peersTimeout.Reset(maxStaleLeadership)
s.logger.Printf("[INFO] server.consul: successfully contacted %d Nomad Servers", numServersContacted)

return nil
}

s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn)
return nil
}

// setupConsulSyncer creates Server-mode consul.Syncer which periodically
// executes callbacks on a fixed interval.
func (s *Server) setupConsulSyncer() error {
var mErr multierror.Error
if s.config.ConsulConfig.ServerAutoJoin {
s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn)
if err := s.setupBootstrapHandler(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}

return nil
return mErr.ErrorOrNil()
}

// setupRPC is used to setup the RPC listener
Expand Down

0 comments on commit a4cfb2b

Please sign in to comment.