From a7a6cdf6d0c3cf6a2f61b8e8054ad7705ba11280 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 16 Mar 2023 11:04:34 -0400 Subject: [PATCH] have registerNode handle NodeUpdateResponse We don't really have to worry about regions in the Consul discovery step. If we hit a different region and they're federated, the request will be forwarded. If the regions aren't federated (not a very safe topology in general), the node registration will fail and we'll retry. Eliminate the region tags we added to Consul. Have `registerNode` update the server list based on the response we get, and have it return the "no servers" error if we get no servers so that we kick off discovery again and retry immediately rather than 15s later. --- client/client.go | 70 +++++++++++++++++++++++------------------- command/agent/agent.go | 3 +- 2 files changed, 40 insertions(+), 33 deletions(-) diff --git a/client/client.go b/client/client.go index 2dcd25457576..5a62aa4665a0 100644 --- a/client/client.go +++ b/client/client.go @@ -58,7 +58,6 @@ import ( vaultapi "github.com/hashicorp/vault/api" "github.com/shirou/gopsutil/v3/host" "golang.org/x/exp/maps" - "golang.org/x/exp/slices" ) const ( @@ -1944,7 +1943,7 @@ func (c *Client) retryRegisterNode() { } retryIntv := registerRetryIntv - if err == noServersErr { + if err == noServersErr || structs.IsErrNoRegionPath(err) { c.logger.Debug("registration waiting on servers") c.triggerDiscovery() retryIntv = noServerRetryIntv @@ -1971,6 +1970,11 @@ func (c *Client) registerNode() error { return err } + err := c.handleNodeUpdateResponse(resp) + if err != nil { + return err + } + // Update the node status to ready after we register. c.UpdateConfig(func(c *config.Config) { c.Node.Status = structs.NodeStatusReady @@ -1985,6 +1989,7 @@ func (c *Client) registerNode() error { defer c.heartbeatLock.Unlock() c.heartbeatStop.setLastOk(time.Now()) c.heartbeatTTL = resp.HeartbeatTTL + return nil } @@ -2036,6 +2041,25 @@ func (c *Client) updateNodeStatus() error { } }) + err := c.handleNodeUpdateResponse(resp) + if err != nil { + return fmt.Errorf("heartbeat response returned no valid servers") + } + + // Begin polling Consul if there is no Nomad leader. We could be + // heartbeating to a Nomad server that is in the minority of a + // partition of the Nomad server quorum, but this Nomad Agent still + // has connectivity to the existing majority of Nomad Servers, but + // only if it queries Consul. + if resp.LeaderRPCAddr == "" { + c.triggerDiscovery() + } + + c.EnterpriseClient.SetFeatures(resp.Features) + return nil +} + +func (c *Client) handleNodeUpdateResponse(resp structs.NodeUpdateResponse) error { // Update the number of nodes in the cluster so we can adjust our server // rebalance rate. c.servers.SetNumNodes(resp.NumNodes) @@ -2052,20 +2076,9 @@ func (c *Client) updateNodeStatus() error { nomadServers = append(nomadServers, e) } if len(nomadServers) == 0 { - return fmt.Errorf("heartbeat response returned no valid servers") + return noServersErr } c.servers.SetServers(nomadServers) - - // Begin polling Consul if there is no Nomad leader. We could be - // heartbeating to a Nomad server that is in the minority of a - // partition of the Nomad server quorum, but this Nomad Agent still - // has connectivity to the existing majority of Nomad Servers, but - // only if it queries Consul. - if resp.LeaderRPCAddr == "" { - c.triggerDiscovery() - } - - c.EnterpriseClient.SetFeatures(resp.Features) return nil } @@ -2907,9 +2920,6 @@ func (c *Client) consulDiscoveryImpl() error { dcs = dcs[0:helper.Min(len(dcs), datacenterQueryLimit)] } - // Query for servers in this client's region only - region := c.Region() - serviceName := c.GetConfig().ConsulConfig.ServerServiceName var mErr multierror.Error var nomadServers servers.Servers @@ -2929,21 +2939,19 @@ DISCOLOOP: } for _, s := range consulServices { - if slices.Contains(s.ServiceTags, region) { - port := strconv.Itoa(s.ServicePort) - addrstr := s.ServiceAddress - if addrstr == "" { - addrstr = s.Address - } - addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port)) - if err != nil { - mErr.Errors = append(mErr.Errors, err) - continue - } - - srv := &servers.Server{Addr: addr} - nomadServers = append(nomadServers, srv) + port := strconv.Itoa(s.ServicePort) + addrstr := s.ServiceAddress + if addrstr == "" { + addrstr = s.Address } + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port)) + if err != nil { + mErr.Errors = append(mErr.Errors, err) + continue + } + + srv := &servers.Server{Addr: addr} + nomadServers = append(nomadServers, srv) } if len(nomadServers) > 0 { diff --git a/command/agent/agent.go b/command/agent/agent.go index 6dfac3bcc1dd..13c896c7fdba 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -891,8 +891,7 @@ func (a *Agent) setupServer() error { rpcServ := &structs.Service{ Name: a.config.Consul.ServerServiceName, PortLabel: a.config.AdvertiseAddrs.RPC, - Tags: append([]string{consul.ServiceTagRPC, a.config.Region}, - a.config.Consul.Tags...), + Tags: append([]string{consul.ServiceTagRPC}, a.config.Consul.Tags...), Checks: []*structs.ServiceCheck{ { Name: a.config.Consul.ServerRPCCheckName,