Skip to content

Commit

Permalink
Merge pull request #3738 from hashicorp/f-sticky-clients
Browse files Browse the repository at this point in the history
Client RPCs are sticky to server
  • Loading branch information
dadgar committed Jan 26, 2018
2 parents 6a0eb7b + 5917bd9 commit 47d8fc5
Show file tree
Hide file tree
Showing 16 changed files with 1,149 additions and 493 deletions.
97 changes: 59 additions & 38 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/rpc"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/servers"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/command/agent/consul"
Expand Down Expand Up @@ -111,8 +113,8 @@ type Client struct {

connPool *pool.ConnPool

// servers is the (optionally prioritized) list of nomad servers
servers *serverlist
// servers is the list of nomad servers
servers *servers.Manager

// heartbeat related times for tracking how often to heartbeat
lastHeartbeat time.Time
Expand Down Expand Up @@ -198,11 +200,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
servers: newServerList(),
triggerDiscoveryCh: make(chan struct{}),
serversDiscoveredCh: make(chan struct{}),
}

// Initialize the server manager
c.servers = servers.New(c.logger, c.shutdownCh, c)

// Initialize the client
if err := c.init(); err != nil {
return nil, fmt.Errorf("failed to initialize client: %v", err)
Expand Down Expand Up @@ -268,7 +272,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
// Setup Consul discovery if enabled
if c.configCopy.ConsulConfig.ClientAutoJoin != nil && *c.configCopy.ConsulConfig.ClientAutoJoin {
go c.consulDiscovery()
if len(c.servers.all()) == 0 {
if c.servers.NumServers() == 0 {
// No configured servers; trigger discovery manually
c.triggerDiscoveryCh <- struct{}{}
}
Expand Down Expand Up @@ -463,7 +467,7 @@ func (c *Client) Stats() map[string]map[string]string {
stats := map[string]map[string]string{
"client": {
"node_id": c.NodeID(),
"known_servers": c.servers.all().String(),
"known_servers": strings.Join(c.GetServers(), ","),
"num_allocations": strconv.Itoa(c.NumAllocs()),
"last_heartbeat": fmt.Sprintf("%v", time.Since(c.lastHeartbeat)),
"heartbeat_ttl": fmt.Sprintf("%v", c.heartbeatTTL),
Expand Down Expand Up @@ -548,32 +552,49 @@ func (c *Client) GetClientAlloc(allocID string) (*structs.Allocation, error) {

// GetServers returns the list of nomad servers this client is aware of.
func (c *Client) GetServers() []string {
endpoints := c.servers.all()
endpoints := c.servers.GetServers()
res := make([]string, len(endpoints))
for i := range endpoints {
res[i] = endpoints[i].addr.String()
res[i] = endpoints[i].String()
}
sort.Strings(res)
return res
}

// SetServers sets a new list of nomad servers to connect to. As long as one
// server is resolvable no error is returned.
func (c *Client) SetServers(servers []string) error {
endpoints := make([]*endpoint, 0, len(servers))
func (c *Client) SetServers(in []string) error {
var mu sync.Mutex
var wg sync.WaitGroup
var merr multierror.Error
for _, s := range servers {
addr, err := resolveServer(s)
if err != nil {
c.logger.Printf("[DEBUG] client: ignoring server %s due to resolution error: %v", s, err)
merr.Errors = append(merr.Errors, err)
continue
}

// Valid endpoint, append it without a priority as this API
// doesn't support different priorities for different servers
endpoints = append(endpoints, &endpoint{name: s, addr: addr})
endpoints := make([]*servers.Server, 0, len(in))
wg.Add(len(in))

for _, s := range in {
go func(srv string) {
defer wg.Done()
addr, err := resolveServer(srv)
if err != nil {
c.logger.Printf("[DEBUG] client: ignoring server %s due to resolution error: %v", srv, err)
merr.Errors = append(merr.Errors, err)
return
}

// Try to ping to check if it is a real server
if err := c.Ping(addr); err != nil {
merr.Errors = append(merr.Errors, fmt.Errorf("Server at address %s failed ping: %v", addr, err))
return
}

mu.Lock()
endpoints = append(endpoints, &servers.Server{Addr: addr})
mu.Unlock()
}(s)
}

wg.Wait()

// Only return errors if no servers are valid
if len(endpoints) == 0 {
if len(merr.Errors) > 0 {
Expand All @@ -582,7 +603,7 @@ func (c *Client) SetServers(servers []string) error {
return noServersErr
}

c.servers.set(endpoints)
c.servers.SetServers(endpoints)
return nil
}

Expand Down Expand Up @@ -1228,26 +1249,25 @@ func (c *Client) updateNodeStatus() error {
}
}

// Convert []*NodeServerInfo to []*endpoints
localdc := c.Datacenter()
servers := make(endpoints, 0, len(resp.Servers))
// Update the number of nodes in the cluster so we can adjust our server
// rebalance rate.
c.servers.SetNumNodes(resp.NumNodes)

// Convert []*NodeServerInfo to []*servers.Server
nomadServers := make([]*servers.Server, 0, len(resp.Servers))
for _, s := range resp.Servers {
addr, err := resolveServer(s.RPCAdvertiseAddr)
if err != nil {
c.logger.Printf("[WARN] client: ignoring invalid server %q: %v", s.RPCAdvertiseAddr, err)
continue
}
e := endpoint{name: s.RPCAdvertiseAddr, addr: addr}
if s.Datacenter != localdc {
// server is non-local; de-prioritize
e.priority = 1
}
servers = append(servers, &e)
e := &servers.Server{DC: s.Datacenter, Addr: addr}
nomadServers = append(nomadServers, e)
}
if len(servers) == 0 {
return fmt.Errorf("server returned no valid servers")
if len(nomadServers) == 0 {
return fmt.Errorf("heartbeat response returned no valid servers")
}
c.servers.set(servers)
c.servers.SetServers(nomadServers)

// Begin polling Consul if there is no Nomad leader. We could be
// heartbeating to a Nomad server that is in the minority of a
Expand Down Expand Up @@ -1840,7 +1860,7 @@ func (c *Client) consulDiscoveryImpl() error {

serviceName := c.configCopy.ConsulConfig.ServerServiceName
var mErr multierror.Error
var servers endpoints
var nomadServers servers.Servers
c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs)
DISCOLOOP:
for _, dc := range dcs {
Expand Down Expand Up @@ -1880,22 +1900,23 @@ DISCOLOOP:
if err != nil {
mErr.Errors = append(mErr.Errors, err)
}
servers = append(servers, &endpoint{name: p, addr: addr})
srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)
}
if len(servers) > 0 {
if len(nomadServers) > 0 {
break DISCOLOOP
}
}
}
if len(servers) == 0 {
if len(nomadServers) == 0 {
if len(mErr.Errors) > 0 {
return mErr.ErrorOrNil()
}
return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %+q", serviceName, dcs)
}

c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", servers)
c.servers.set(servers)
c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", nomadServers)
c.servers.SetServers(nomadServers)

// Notify waiting rpc calls. If a goroutine just failed an RPC call and
// isn't receiving on this chan yet they'll still retry eventually.
Expand Down
24 changes: 24 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,3 +904,27 @@ func TestClient_ValidateMigrateToken_ACLDisabled(t *testing.T) {

assert.Equal(c.ValidateMigrateToken("", ""), true)
}

// TestClient_ServerList tests client methods that interact with the internal
// nomad server list.
func TestClient_ServerList(t *testing.T) {
t.Parallel()
client := TestClient(t, func(c *config.Config) {})

if s := client.GetServers(); len(s) != 0 {
t.Fatalf("expected server lit to be empty but found: %+q", s)
}
if err := client.SetServers(nil); err != noServersErr {
t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err)
}
if err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil {
t.Fatalf("expected setting a bad server to return an error")
}
if err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil {
t.Fatalf("expected setting at least one good server to succeed but received: %v", err)
}
s := client.GetServers()
if len(s) != 0 {
t.Fatalf("expected 2 servers but received: %+q", s)
}
}
8 changes: 8 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ type Config struct {
// BackwardsCompatibleMetrics determines whether to show methods of
// displaying metrics for older verions, or to only show the new format
BackwardsCompatibleMetrics bool

// RPCHoldTimeout is how long an RPC can be "held" before it is errored.
// This is used to paper over a loss of leadership by instead holding RPCs,
// so that the caller experiences a slow response rather than an error.
// This period is meant to be long enough for a leader election to take
// place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration
}

func (c *Config) Copy() *Config {
Expand Down Expand Up @@ -228,6 +235,7 @@ func DefaultConfig() *Config {
NoHostUUID: true,
DisableTaggedMetrics: false,
BackwardsCompatibleMetrics: false,
RPCHoldTimeout: 5 * time.Second,
}
}

Expand Down
75 changes: 59 additions & 16 deletions client/rpc.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package client

import (
"fmt"
"io"
"net"
"net/rpc"
"strings"
"time"

metrics "github.com/armon/go-metrics"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/nomad/helper/codec"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/yamux"
)

Expand Down Expand Up @@ -39,26 +40,60 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
return c.config.RPCHandler.RPC(method, args, reply)
}

servers := c.servers.all()
if len(servers) == 0 {
// This is subtle but we start measuring the time on the client side
// right at the time of the first request, vs. on the first retry as
// is done on the server side inside forward(). This is because the
// servers may already be applying the RPCHoldTimeout up there, so by
// starting the timer here we won't potentially double up the delay.
firstCheck := time.Now()

TRY:
server := c.servers.FindServer()
if server == nil {
return noServersErr
}

var mErr multierror.Error
for _, s := range servers {
// Make the RPC request
if err := c.connPool.RPC(c.Region(), s.addr, c.RPCMajorVersion(), method, args, reply); err != nil {
errmsg := fmt.Errorf("RPC failed to server %s: %v", s.addr, err)
mErr.Errors = append(mErr.Errors, errmsg)
c.logger.Printf("[DEBUG] client: %v", errmsg)
c.servers.failed(s)
continue
}
c.servers.good(s)
// Make the request.
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)
if rpcErr == nil {
return nil
}

return mErr.ErrorOrNil()
// Move off to another server, and see if we can retry.
c.logger.Printf("[ERR] nomad: %q RPC failed to server %s: %v", method, server.Addr, rpcErr)
c.servers.NotifyFailedServer(server)
if retry := canRetry(args, rpcErr); !retry {
return rpcErr
}

// We can wait a bit and retry!
if time.Since(firstCheck) < c.config.RPCHoldTimeout {
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)
select {
case <-time.After(jitter):
goto TRY
case <-c.shutdownCh:
}
}
return rpcErr
}

// canRetry returns true if the given situation is safe for a retry.
func canRetry(args interface{}, err error) bool {
// No leader errors are always safe to retry since no state could have
// been changed.
if structs.IsErrNoLeader(err) {
return true
}

// Reads are safe to retry for stream errors, such as if a server was
// being shut down.
info, ok := args.(structs.RPCInfo)
if ok && info.IsRead() && lib.IsErrEOF(err) {
return true
}

return false
}

// setupClientRpc is used to setup the Client's RPC endpoints
Expand Down Expand Up @@ -159,3 +194,11 @@ func resolveServer(s string) (net.Addr, error) {
}
return net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port))
}

// Ping is used to ping a particular server and returns whether it is healthy or
// a potential error.
func (c *Client) Ping(srv net.Addr) error {
var reply struct{}
err := c.connPool.RPC(c.Region(), srv, c.RPCMajorVersion(), "Status.Ping", struct{}{}, &reply)
return err
}
Loading

0 comments on commit 47d8fc5

Please sign in to comment.