Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: don't use Status RPC for Consul discovery (#16490) #16490

Merged
merged 7 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16490.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
client: Fixed a bug where clients using Consul discovery to join the cluster would get permission denied errors
```
75 changes: 30 additions & 45 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1943,7 +1943,7 @@ func (c *Client) retryRegisterNode() {
}

retryIntv := registerRetryIntv
if err == noServersErr {
if err == noServersErr || structs.IsErrNoRegionPath(err) {
c.logger.Debug("registration waiting on servers")
c.triggerDiscovery()
retryIntv = noServerRetryIntv
Expand All @@ -1970,6 +1970,11 @@ func (c *Client) registerNode() error {
return err
}

err := c.handleNodeUpdateResponse(resp)
if err != nil {
return err
}

// Update the node status to ready after we register.
c.UpdateConfig(func(c *config.Config) {
c.Node.Status = structs.NodeStatusReady
Expand All @@ -1984,6 +1989,7 @@ func (c *Client) registerNode() error {
defer c.heartbeatLock.Unlock()
c.heartbeatStop.setLastOk(time.Now())
c.heartbeatTTL = resp.HeartbeatTTL

return nil
}

Expand Down Expand Up @@ -2035,6 +2041,22 @@ func (c *Client) updateNodeStatus() error {
}
})

err := c.handleNodeUpdateResponse(resp)
if err != nil {
return fmt.Errorf("heartbeat response returned no valid servers")
}

// If there's no Leader in the response we may be talking to a partitioned
// server. Redo discovery to ensure our server list is up to date.
if resp.LeaderRPCAddr == "" {
c.triggerDiscovery()
}

c.EnterpriseClient.SetFeatures(resp.Features)
return nil
}

func (c *Client) handleNodeUpdateResponse(resp structs.NodeUpdateResponse) error {
// Update the number of nodes in the cluster so we can adjust our server
// rebalance rate.
c.servers.SetNumNodes(resp.NumNodes)
Expand All @@ -2051,20 +2073,9 @@ func (c *Client) updateNodeStatus() error {
nomadServers = append(nomadServers, e)
}
if len(nomadServers) == 0 {
return fmt.Errorf("heartbeat response returned no valid servers")
return noServersErr
}
c.servers.SetServers(nomadServers)

// Begin polling Consul if there is no Nomad leader. We could be
// heartbeating to a Nomad server that is in the minority of a
// partition of the Nomad server quorum, but this Nomad Agent still
// has connectivity to the existing majority of Nomad Servers, but
// only if it queries Consul.
if resp.LeaderRPCAddr == "" {
c.triggerDiscovery()
}

c.EnterpriseClient.SetFeatures(resp.Features)
return nil
}

Expand Down Expand Up @@ -2906,14 +2917,6 @@ func (c *Client) consulDiscoveryImpl() error {
dcs = dcs[0:helper.Min(len(dcs), datacenterQueryLimit)]
}

// Query for servers in this client's region only
region := c.Region()
rpcargs := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: region,
},
}

serviceName := c.GetConfig().ConsulConfig.ServerServiceName
var mErr multierror.Error
var nomadServers servers.Servers
Expand Down Expand Up @@ -2944,32 +2947,14 @@ DISCOLOOP:
continue
}

// Query the members from the region that Consul gave us, and
// extract the client-advertise RPC address from each member
var membersResp structs.ServerMembersResponse
if err := c.connPool.RPC(region, addr, "Status.Members", rpcargs, &membersResp); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
for _, member := range membersResp.Members {
if addrTag, ok := member.Tags["rpc_addr"]; ok {
if portTag, ok := member.Tags["port"]; ok {
addr, err := net.ResolveTCPAddr("tcp",
fmt.Sprintf("%s:%s", addrTag, portTag))
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)
}
}
}
srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)
}

if len(nomadServers) > 0 {
break DISCOLOOP
}
if len(nomadServers) > 0 {
break DISCOLOOP
}

}
if len(nomadServers) == 0 {
if len(mErr.Errors) > 0 {
Expand Down