From 7dddac8e2855cbf9bdb6bf4f288dab794af50a0a Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 14 Mar 2023 16:11:11 -0400 Subject: [PATCH] client: use `Status.RPCServers` RPC for Consul discovery In #16217 we switched clients using Consul discovery to the `Status.Members` endpoint for getting the list of servers so that we're using the correct address. This endpoint has an authorization gate, so this fails if the anonymous policy doesn't have `node:read`. We also can't check the `AuthToken` for the request for the client secret, because the client hasn't yet registered so the server doesn't have anything to compare against. Create a new `Status.RPCServers` endpoint that mirrors the `Status.Peers` endpoint but provides the RPC server addresses instead of the Raft addresses. This fixes the authentication bug but also ensures we're only registering with servers in the client's region and not in any other servers that might have registered with Consul. This changeset also expands the test coverage of the RPC endpoint and closes up potential holes in the `ResolveACL` method that aren't currently bugs but easily could become bugs if we called the method without ensuring its invariants are upheld. --- .changelog/16490.txt | 3 +++ client/client.go | 33 ++++++++++++------------ nomad/acl.go | 12 +++++++-- nomad/status_endpoint.go | 27 ++++++++++++++++++++ nomad/status_endpoint_test.go | 48 +++++++++++++++++++++++++++++++++-- 5 files changed, 102 insertions(+), 21 deletions(-) create mode 100644 .changelog/16490.txt diff --git a/.changelog/16490.txt b/.changelog/16490.txt new file mode 100644 index 000000000000..42584d3f7461 --- /dev/null +++ b/.changelog/16490.txt @@ -0,0 +1,3 @@ +```release-note:bug +client: Fixed a bug where clients using Consul discovery to join the cluster would get permission denied errors +``` diff --git a/client/client.go b/client/client.go index 09b2be0db030..13981f74eab9 100644 --- a/client/client.go +++ b/client/client.go @@ -2906,7 +2906,8 @@ func (c *Client) consulDiscoveryImpl() error { dcs = dcs[0:helper.Min(len(dcs), datacenterQueryLimit)] } - // Query for servers in this client's region only + // Query for servers in this client's region only. Note this has to be an + // unauthenticated request because we haven't registered yet. region := c.Region() rpcargs := structs.GenericRequest{ QueryOptions: structs.QueryOptions{ @@ -2944,26 +2945,24 @@ DISCOLOOP: continue } - // Query the members from the region that Consul gave us, and - // extract the client-advertise RPC address from each member - var membersResp structs.ServerMembersResponse - if err := c.connPool.RPC(region, addr, "Status.Members", rpcargs, &membersResp); err != nil { + srv := &servers.Server{Addr: addr} + nomadServers = append(nomadServers, srv) + + // Query the client-advertise RPC addresses from the region that + // Consul gave us + var members []string + if err := c.connPool.RPC(region, addr, "Status.RPCServers", rpcargs, &members); err != nil { mErr.Errors = append(mErr.Errors, err) continue } - for _, member := range membersResp.Members { - if addrTag, ok := member.Tags["rpc_addr"]; ok { - if portTag, ok := member.Tags["port"]; ok { - addr, err := net.ResolveTCPAddr("tcp", - fmt.Sprintf("%s:%s", addrTag, portTag)) - if err != nil { - mErr.Errors = append(mErr.Errors, err) - continue - } - srv := &servers.Server{Addr: addr} - nomadServers = append(nomadServers, srv) - } + for _, member := range members { + addr, err := net.ResolveTCPAddr("tcp", member) + if err != nil { + mErr.Errors = append(mErr.Errors, err) + continue } + srv := &servers.Server{Addr: addr} + nomadServers = append(nomadServers, srv) } if len(nomadServers) > 0 { diff --git a/nomad/acl.go b/nomad/acl.go index f2a7a5058a4e..38947d7f834b 100644 --- a/nomad/acl.go +++ b/nomad/acl.go @@ -161,9 +161,14 @@ func (s *Server) remoteIPFromRPCContext(ctx *RPCContext) (net.IP, error) { // for the identity they intend the operation to be performed with. func (s *Server) ResolveACL(args structs.RequestWithIdentity) (*acl.ACL, error) { identity := args.GetIdentity() - if !s.config.ACLEnabled || identity == nil { + if !s.config.ACLEnabled { return nil, nil } + if identity == nil { + // Server.Authenticate should never return a nil identity unless there's + // an authentication error, but enforce that invariant here + return nil, structs.ErrPermissionDenied + } aclToken := identity.GetACLToken() if aclToken != nil { return s.ResolveACLForToken(aclToken) @@ -172,7 +177,10 @@ func (s *Server) ResolveACL(args structs.RequestWithIdentity) (*acl.ACL, error) if claims != nil { return s.ResolveClaims(claims) } - return nil, nil + + // return an error here so that we enforce the invariant that we check for + // Identity.ClientID before trying to resolve ACLs + return nil, structs.ErrPermissionDenied } // ResolveACLForToken resolves an ACL from a token only. It should be used only diff --git a/nomad/status_endpoint.go b/nomad/status_endpoint.go index f5b53a5c99c1..8207b8a66af7 100644 --- a/nomad/status_endpoint.go +++ b/nomad/status_endpoint.go @@ -78,6 +78,33 @@ func (s *Status) Peers(args *structs.GenericRequest, reply *[]string) error { return nil } +// RPCServers is used to get all the RPC server addresses in a region +func (s *Status) RPCServers(args *structs.GenericRequest, reply *[]string) error { + // note: we're intentionally throwing away any auth error here and only + // authenticate so that we can measure rate metrics + s.srv.Authenticate(s.ctx, args) + s.srv.MeasureRPCRate("status", structs.RateMetricList, args) + + if args.Region == "" { + args.Region = s.srv.config.Region + } + if done, err := s.srv.forward("Status.RPCServers", args, args, reply); done { + return err + } + + serfMembers := s.srv.Members() + for _, member := range serfMembers { + if region := member.Tags["region"]; region == args.Region { + if addrTag, ok := member.Tags["rpc_addr"]; ok { + if portTag, ok := member.Tags["port"]; ok { + *reply = append(*reply, fmt.Sprintf("%s:%s", addrTag, portTag)) + } + } + } + } + return nil +} + // Members return the list of servers in a cluster that a particular server is // aware of func (s *Status) Members(args *structs.GenericRequest, reply *structs.ServerMembersResponse) error { diff --git a/nomad/status_endpoint_test.go b/nomad/status_endpoint_test.go index e7eb6901c645..d1566a2a502b 100644 --- a/nomad/status_endpoint_test.go +++ b/nomad/status_endpoint_test.go @@ -1,17 +1,20 @@ package nomad import ( + "fmt" "testing" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/shoenig/test/must" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestStatusPing(t *testing.T) { @@ -73,6 +76,47 @@ func TestStatusPeers(t *testing.T) { } } +func TestStatus_RPCServers(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.Region = "region1" + }) + defer cleanupS1() + + s2, cleanupS2 := TestServer(t, func(c *Config) { + c.Region = "region2" + }) + defer cleanupS2() + + // Join them together + s2Addr := fmt.Sprintf("127.0.0.1:%d", s2.config.SerfConfig.MemberlistConfig.BindPort) + n, err := s1.Join([]string{s2Addr}) + must.NoError(t, err, must.Sprintf("Failed joining: %v (%d joined)", err, n)) + + codec := rpcClient(t, s1) + + t.Run("own region", func(t *testing.T) { + arg := &structs.GenericRequest{ + QueryOptions: structs.QueryOptions{Region: "region1"}, + } + var members []string + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Status.RPCServers", arg, &members)) + must.Len(t, 1, members) + must.Eq(t, s1.clientRpcAdvertise.String(), members[0]) + }) + + t.Run("other region", func(t *testing.T) { + arg := &structs.GenericRequest{ + QueryOptions: structs.QueryOptions{Region: "region2"}, + } + var members []string + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Status.RPCServers", arg, &members)) + must.Len(t, 1, members) + must.Eq(t, s2.clientRpcAdvertise.String(), members[0]) + }) +} + func TestStatusMembers(t *testing.T) { ci.Parallel(t)