Skip to content

Commit

Permalink
have registerNode handle NodeUpdateResponse
Browse files Browse the repository at this point in the history
We don't really have to worry about regions in the Consul discovery step. If we
hit a different region and they're federated, the request will be forwarded. If
the regions aren't federated (not a very safe topology in general), the node
registration will fail and we'll retry.

Eliminate the region tags we added to Consul. Have `registerNode` update the
server list based on the response we get, and have it return the "no servers"
error if we get no servers so that we kick off discovery again and retry
immediately rather than 15s later.
  • Loading branch information
tgross committed Mar 16, 2023
1 parent 25d0361 commit 307a0c2
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 32 deletions.
68 changes: 38 additions & 30 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ import (
vaultapi "github.com/hashicorp/vault/api"
"github.com/shirou/gopsutil/v3/host"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

const (
Expand Down Expand Up @@ -1971,6 +1970,11 @@ func (c *Client) registerNode() error {
return err
}

err := c.handleNodeUpdateResponse(resp)
if err != nil {
return err
}

// Update the node status to ready after we register.
c.UpdateConfig(func(c *config.Config) {
c.Node.Status = structs.NodeStatusReady
Expand All @@ -1985,6 +1989,7 @@ func (c *Client) registerNode() error {
defer c.heartbeatLock.Unlock()
c.heartbeatStop.setLastOk(time.Now())
c.heartbeatTTL = resp.HeartbeatTTL

return nil
}

Expand Down Expand Up @@ -2036,6 +2041,25 @@ func (c *Client) updateNodeStatus() error {
}
})

err := c.handleNodeUpdateResponse(resp)
if err != nil {
return fmt.Errorf("heartbeat response returned no valid servers")
}

// Begin polling Consul if there is no Nomad leader. We could be
// heartbeating to a Nomad server that is in the minority of a
// partition of the Nomad server quorum, but this Nomad Agent still
// has connectivity to the existing majority of Nomad Servers, but
// only if it queries Consul.
if resp.LeaderRPCAddr == "" {
c.triggerDiscovery()
}

c.EnterpriseClient.SetFeatures(resp.Features)
return nil
}

func (c *Client) handleNodeUpdateResponse(resp structs.NodeUpdateResponse) error {
// Update the number of nodes in the cluster so we can adjust our server
// rebalance rate.
c.servers.SetNumNodes(resp.NumNodes)
Expand All @@ -2052,20 +2076,9 @@ func (c *Client) updateNodeStatus() error {
nomadServers = append(nomadServers, e)
}
if len(nomadServers) == 0 {
return fmt.Errorf("heartbeat response returned no valid servers")
return noServersErr
}
c.servers.SetServers(nomadServers)

// Begin polling Consul if there is no Nomad leader. We could be
// heartbeating to a Nomad server that is in the minority of a
// partition of the Nomad server quorum, but this Nomad Agent still
// has connectivity to the existing majority of Nomad Servers, but
// only if it queries Consul.
if resp.LeaderRPCAddr == "" {
c.triggerDiscovery()
}

c.EnterpriseClient.SetFeatures(resp.Features)
return nil
}

Expand Down Expand Up @@ -2907,9 +2920,6 @@ func (c *Client) consulDiscoveryImpl() error {
dcs = dcs[0:helper.Min(len(dcs), datacenterQueryLimit)]
}

// Query for servers in this client's region only
region := c.Region()

serviceName := c.GetConfig().ConsulConfig.ServerServiceName
var mErr multierror.Error
var nomadServers servers.Servers
Expand All @@ -2929,21 +2939,19 @@ DISCOLOOP:
}

for _, s := range consulServices {
if slices.Contains(s.ServiceTags, region) {
port := strconv.Itoa(s.ServicePort)
addrstr := s.ServiceAddress
if addrstr == "" {
addrstr = s.Address
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port))
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}

srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)
port := strconv.Itoa(s.ServicePort)
addrstr := s.ServiceAddress
if addrstr == "" {
addrstr = s.Address
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port))
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}

srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)
}

if len(nomadServers) > 0 {
Expand Down
3 changes: 1 addition & 2 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,8 +891,7 @@ func (a *Agent) setupServer() error {
rpcServ := &structs.Service{
Name: a.config.Consul.ServerServiceName,
PortLabel: a.config.AdvertiseAddrs.RPC,
Tags: append([]string{consul.ServiceTagRPC, a.config.Region},
a.config.Consul.Tags...),
Tags: append([]string{consul.ServiceTagRPC}, a.config.Consul.Tags...),
Checks: []*structs.ServiceCheck{
{
Name: a.config.Consul.ServerRPCCheckName,
Expand Down

0 comments on commit 307a0c2

Please sign in to comment.