Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: don't use Status RPC for Consul discovery (#16490) #16490

Merged
merged 7 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16490.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
client: Fixed a bug where clients using Consul discovery to join the cluster would get permission denied errors
```
56 changes: 18 additions & 38 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
vaultapi "github.com/hashicorp/vault/api"
"github.com/shirou/gopsutil/v3/host"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

const (
Expand Down Expand Up @@ -2908,11 +2909,6 @@ func (c *Client) consulDiscoveryImpl() error {

// Query for servers in this client's region only
region := c.Region()
rpcargs := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: region,
},
}

serviceName := c.GetConfig().ConsulConfig.ServerServiceName
var mErr multierror.Error
Expand All @@ -2933,43 +2929,27 @@ DISCOLOOP:
}

for _, s := range consulServices {
port := strconv.Itoa(s.ServicePort)
addrstr := s.ServiceAddress
if addrstr == "" {
addrstr = s.Address
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port))
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}

// Query the members from the region that Consul gave us, and
// extract the client-advertise RPC address from each member
var membersResp structs.ServerMembersResponse
if err := c.connPool.RPC(region, addr, "Status.Members", rpcargs, &membersResp); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
for _, member := range membersResp.Members {
if addrTag, ok := member.Tags["rpc_addr"]; ok {
if portTag, ok := member.Tags["port"]; ok {
addr, err := net.ResolveTCPAddr("tcp",
fmt.Sprintf("%s:%s", addrTag, portTag))
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)
}
if slices.Contains(s.ServiceTags, region) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we switch to Meta (see below) this will need to change.

However now I think we introduced ordering to our otherwise flexible upgrade procedure (at least one Server would need to be upgraded before any Clients for this to work). 🤔

But wait... what actually happens if a Client in Region A calls Node.Register on a Server in Region B?

  • If the Regions are Federated, the Node.Register RPC is Forwarded and replies with the correct server list for Region A!
  • If the Regions are not Federated, the Node.Register RPC errors and the Client will try the next Server, presumably until reaching a Server in its own Region.

...except I don't think that will happen for the following reasons: 😭

  1. The Client.registerNode() code doesn't appear to use the response it gets from Node.Register! We pluck out the HeartbeatTTL but throw away the Server list. 👎
  2. If the Regions aren't Federated, we wait up to 30s before retrying the Node.Register RPC. This means for clusters with <1,000 nodes we risk going down on Client agent restarts just from a single failed Node.Register call! 👎

Have I mentioned lately I hate all of this discovery/registration code and logic? I think it's too clever by half and makes the really critical operations like Restarting Clients really need to Register before HeartbeatTTL extremely difficult to determine/observe/tune. 😞

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the PR (and the description above) to drop the extra tags/meta on Consul. And then I've updated the registerNode method to use the servers we get back from the response. If we get no servers or a "no path to region", we'll bubble that state up to the retry loop so that it triggers discovery again and hits the fast-path retry rather than the longer retry of 15s.

port := strconv.Itoa(s.ServicePort)
addrstr := s.ServiceAddress
if addrstr == "" {
addrstr = s.Address
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port))
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
}

if len(nomadServers) > 0 {
break DISCOLOOP
srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)
}
}

if len(nomadServers) > 0 {
break DISCOLOOP
}

}
if len(nomadServers) == 0 {
if len(mErr.Errors) > 0 {
Expand Down
3 changes: 2 additions & 1 deletion command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,8 @@ func (a *Agent) setupServer() error {
rpcServ := &structs.Service{
Name: a.config.Consul.ServerServiceName,
PortLabel: a.config.AdvertiseAddrs.RPC,
Tags: append([]string{consul.ServiceTagRPC}, a.config.Consul.Tags...),
Tags: append([]string{consul.ServiceTagRPC, a.config.Region},
a.config.Consul.Tags...),
tgross marked this conversation as resolved.
Show resolved Hide resolved
tgross marked this conversation as resolved.
Show resolved Hide resolved
Checks: []*structs.ServiceCheck{
{
Name: a.config.Consul.ServerRPCCheckName,
Expand Down
12 changes: 10 additions & 2 deletions nomad/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,14 @@ func (s *Server) remoteIPFromRPCContext(ctx *RPCContext) (net.IP, error) {
// for the identity they intend the operation to be performed with.
func (s *Server) ResolveACL(args structs.RequestWithIdentity) (*acl.ACL, error) {
identity := args.GetIdentity()
if !s.config.ACLEnabled || identity == nil {
if !s.config.ACLEnabled {
return nil, nil
tgross marked this conversation as resolved.
Show resolved Hide resolved
}
if identity == nil {
// Server.Authenticate should never return a nil identity unless there's
// an authentication error, but enforce that invariant here
return nil, structs.ErrPermissionDenied
}
aclToken := identity.GetACLToken()
if aclToken != nil {
return s.ResolveACLForToken(aclToken)
Expand All @@ -172,7 +177,10 @@ func (s *Server) ResolveACL(args structs.RequestWithIdentity) (*acl.ACL, error)
if claims != nil {
return s.ResolveClaims(claims)
}
return nil, nil

// return an error here so that we enforce the invariant that we check for
// Identity.ClientID before trying to resolve ACLs
return nil, structs.ErrPermissionDenied
}

// ResolveACLForToken resolves an ACL from a token only. It should be used only
Expand Down