diff --git a/helper/ipaddr/ipaddr.go b/helper/ipaddr/ipaddr.go new file mode 100644 index 000000000000..e42d41c0bfb1 --- /dev/null +++ b/helper/ipaddr/ipaddr.go @@ -0,0 +1,10 @@ +package ipaddr + +// IsAny checks if the given IP address is an IPv4 or IPv6 ANY address. +func IsAny(ip string) bool { + return isAnyV4(ip) || isAnyV6(ip) +} + +func isAnyV4(ip string) bool { return ip == "0.0.0.0" } + +func isAnyV6(ip string) bool { return ip == "::" || ip == "[::]" } diff --git a/helper/ipaddr/ipaddr_test.go b/helper/ipaddr/ipaddr_test.go new file mode 100644 index 000000000000..ad64003a07a7 --- /dev/null +++ b/helper/ipaddr/ipaddr_test.go @@ -0,0 +1,53 @@ +package ipaddr + +import ( + "net" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_IsAny(t *testing.T) { + testCases := []struct { + inputIP string + expectedOutput bool + name string + }{ + { + inputIP: "0.0.0.0", + expectedOutput: true, + name: "string ipv4 any IP", + }, + { + inputIP: "::", + expectedOutput: true, + name: "string ipv6 any IP", + }, + { + inputIP: net.IPv4zero.String(), + expectedOutput: true, + name: "net.IP ipv4 any", + }, + { + inputIP: net.IPv6zero.String(), + expectedOutput: true, + name: "net.IP ipv6 any", + }, + { + inputIP: "10.10.10.10", + expectedOutput: false, + name: "internal ipv4 address", + }, + { + inputIP: "8.8.8.8", + expectedOutput: false, + name: "public ipv4 address", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expectedOutput, IsAny(tc.inputIP)) + }) + } +} diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 92abee62f4a5..8ed1490a615d 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -380,3 +380,67 @@ func (a *Alloc) UpdateDesiredTransition(args *structs.AllocUpdateDesiredTransiti reply.Index = index return nil } + +// GetServiceRegistrations returns a list of service registrations which belong +// to the passed allocation ID. +func (a *Alloc) GetServiceRegistrations( + args *structs.AllocServiceRegistrationsRequest, + reply *structs.AllocServiceRegistrationsResponse) error { + + if done, err := a.srv.forward(structs.AllocServiceRegistrationsRPCMethod, args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "alloc", "get_service_registrations"}, time.Now()) + + // If ACLs are enabled, ensure the caller has the read-job namespace + // capability. + aclObj, err := a.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } else if aclObj != nil { + if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) { + return structs.ErrPermissionDenied + } + } + + // Set up the blocking query. + return a.srv.blockingRPC(&blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, stateStore *state.StateStore) error { + + // Read the allocation to ensure its namespace matches the request + // args. + alloc, err := stateStore.AllocByID(ws, args.AllocID) + if err != nil { + return err + } + + // Guard against the alloc not-existing or that the namespace does + // not match the request arguments. + if alloc == nil || alloc.Namespace != args.RequestNamespace() { + return nil + } + + // Perform the state query to get an iterator. + iter, err := stateStore.GetServiceRegistrationsByAllocID(ws, args.AllocID) + if err != nil { + return err + } + + // Set up our output after we have checked the error. + services := make([]*structs.ServiceRegistration, 0) + + // Iterate the iterator, appending all service registrations + // returned to the reply. + for raw := iter.Next(); raw != nil; raw = iter.Next() { + services = append(services, raw.(*structs.ServiceRegistration)) + } + reply.Services = services + + // Use the index table to populate the query meta as we have no way + // of tracking the max index on deletes. + return a.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta) + }, + }) +} diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index d34abfbbf2dc..ceb7aee1270f 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -1034,3 +1034,312 @@ func TestAllocEndpoint_List_AllNamespaces_ACL_OSS(t *testing.T) { } } + +func TestAlloc_GetServiceRegistrations(t *testing.T) { + t.Parallel() + + // This function is a helper function to set up an allocation and service + // which can be queried. + correctSetupFn := func(s *Server) (error, string, *structs.ServiceRegistration) { + // Generate an upsert an allocation. + alloc := mock.Alloc() + err := s.State().UpsertAllocs(structs.MsgTypeTestSetup, 10, []*structs.Allocation{alloc}) + if err != nil { + return nil, "", nil + } + + // Generate services. Set the allocation ID to the first, so it + // matches the allocation. The alloc and first service both + // reside in the default namespace. + services := mock.ServiceRegistrations() + services[0].AllocID = alloc.ID + err = s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services) + + return err, alloc.ID, services[0] + } + + testCases := []struct { + serverFn func(t *testing.T) (*Server, *structs.ACLToken, func()) + testFn func(t *testing.T, s *Server, token *structs.ACLToken) + name string + }{ + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + err, allocID, service := correctSetupFn(s) + require.NoError(t, err) + + // Perform a lookup on the first service. + serviceRegReq := &structs.AllocServiceRegistrationsRequest{ + AllocID: allocID, + QueryOptions: structs.QueryOptions{ + Namespace: service.Namespace, + Region: s.Region(), + }, + } + var serviceRegResp structs.AllocServiceRegistrationsResponse + err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.EqualValues(t, uint64(20), serviceRegResp.Index) + require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{service}) + }, + name: "ACLs disabled alloc found with regs", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert our services. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services)) + + // Perform a lookup on the first service using the allocation + // ID. This allocation does not exist within the Nomad state + // meaning the service is orphaned or the caller used an + // incorrect allocation ID. + serviceRegReq := &structs.AllocServiceRegistrationsRequest{ + AllocID: services[0].AllocID, + QueryOptions: structs.QueryOptions{ + Namespace: services[0].Namespace, + Region: s.Region(), + }, + } + var serviceRegResp structs.AllocServiceRegistrationsResponse + err := msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.Nil(t, serviceRegResp.Services) + }, + name: "ACLs disabled alloc not found", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + err, allocID, _ := correctSetupFn(s) + require.NoError(t, err) + + // Perform a lookup on the first service using the allocation + // ID but a random namespace. The namespace on the allocation + // does therefore not match the request args. + serviceRegReq := &structs.AllocServiceRegistrationsRequest{ + AllocID: allocID, + QueryOptions: structs.QueryOptions{ + Namespace: "platform", + Region: s.Region(), + }, + } + var serviceRegResp structs.AllocServiceRegistrationsResponse + err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{}) + }, + name: "ACLs disabled alloc found in different namespace than request", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate an upsert an allocation. + alloc := mock.Alloc() + require.NoError(t, s.State().UpsertAllocs( + structs.MsgTypeTestSetup, 10, []*structs.Allocation{alloc})) + + // Perform a lookup using the allocation information. + serviceRegReq := &structs.AllocServiceRegistrationsRequest{ + AllocID: alloc.ID, + QueryOptions: structs.QueryOptions{ + Namespace: alloc.Namespace, + Region: s.Region(), + }, + } + var serviceRegResp structs.AllocServiceRegistrationsResponse + err := msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{}) + }, + name: "ACLs disabled alloc found without regs", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + err, allocID, service := correctSetupFn(s) + require.NoError(t, err) + + // Perform a lookup using the allocation information. + serviceRegReq := &structs.AllocServiceRegistrationsRequest{ + AllocID: allocID, + QueryOptions: structs.QueryOptions{ + Namespace: service.Namespace, + Region: s.Region(), + AuthToken: token.SecretID, + }, + } + var serviceRegResp structs.AllocServiceRegistrationsResponse + err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{service}) + }, + name: "ACLs enabled use management token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + err, allocID, service := correctSetupFn(s) + require.NoError(t, err) + + // Create and policy and grab the auth token. + authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-node-get-service-reg", + mock.NamespacePolicy(service.Namespace, "", []string{acl.NamespaceCapabilityReadJob})).SecretID + + // Perform a lookup using the allocation information. + serviceRegReq := &structs.AllocServiceRegistrationsRequest{ + AllocID: allocID, + QueryOptions: structs.QueryOptions{ + Namespace: service.Namespace, + Region: s.Region(), + AuthToken: authToken, + }, + } + var serviceRegResp structs.AllocServiceRegistrationsResponse + err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{service}) + }, + name: "ACLs enabled use read-job namespace capability token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + err, allocID, service := correctSetupFn(s) + require.NoError(t, err) + + // Create and policy and grab the auth token. + authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-node-get-service-reg", + mock.NamespacePolicy(service.Namespace, "read", nil)).SecretID + + // Perform a lookup using the allocation information. + serviceRegReq := &structs.AllocServiceRegistrationsRequest{ + AllocID: allocID, + QueryOptions: structs.QueryOptions{ + Namespace: service.Namespace, + Region: s.Region(), + AuthToken: authToken, + }, + } + var serviceRegResp structs.AllocServiceRegistrationsResponse + err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{service}) + }, + name: "ACLs enabled use read namespace policy token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + err, allocID, service := correctSetupFn(s) + require.NoError(t, err) + + // Create and policy and grab the auth token. + authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-node-get-service-reg", + mock.NamespacePolicy("ohno", "read", nil)).SecretID + + // Perform a lookup using the allocation information. + serviceRegReq := &structs.AllocServiceRegistrationsRequest{ + AllocID: allocID, + QueryOptions: structs.QueryOptions{ + Namespace: service.Namespace, + Region: s.Region(), + AuthToken: authToken, + }, + } + var serviceRegResp structs.AllocServiceRegistrationsResponse + err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.Error(t, err) + require.Contains(t, err.Error(), "Permission denied") + require.Empty(t, serviceRegResp.Services) + }, + name: "ACLs enabled use read incorrect namespace policy token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + err, allocID, service := correctSetupFn(s) + require.NoError(t, err) + + // Create and policy and grab the auth token. + authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-node-get-service-reg", + mock.NamespacePolicy(service.Namespace, "", []string{acl.NamespaceCapabilityReadScalingPolicy})).SecretID + + // Perform a lookup using the allocation information. + serviceRegReq := &structs.AllocServiceRegistrationsRequest{ + AllocID: allocID, + QueryOptions: structs.QueryOptions{ + Namespace: service.Namespace, + Region: s.Region(), + AuthToken: authToken, + }, + } + var serviceRegResp structs.AllocServiceRegistrationsResponse + err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.Error(t, err) + require.Contains(t, err.Error(), "Permission denied") + require.Empty(t, serviceRegResp.Services) + }, + name: "ACLs enabled use incorrect capability", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + server, aclToken, cleanup := tc.serverFn(t) + defer cleanup() + tc.testFn(t, server, aclToken) + }) + } +} diff --git a/nomad/fsm.go b/nomad/fsm.go index 8f6355936e4e..b507508d68af 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -307,6 +307,12 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyOneTimeTokenDelete(msgType, buf[1:], log.Index) case structs.OneTimeTokenExpireRequestType: return n.applyOneTimeTokenExpire(msgType, buf[1:], log.Index) + case structs.ServiceRegistrationUpsertRequestType: + return n.applyUpsertServiceRegistrations(msgType, buf[1:], log.Index) + case structs.ServiceRegistrationDeleteByIDRequestType: + return n.applyDeleteServiceRegistrationByID(msgType, buf[1:], log.Index) + case structs.ServiceRegistrationDeleteByNodeIDRequestType: + return n.applyDeleteServiceRegistrationByNodeID(msgType, buf[1:], log.Index) } // Check enterprise only message types. @@ -1894,6 +1900,51 @@ func (n *nomadFSM) applyUpsertScalingEvent(buf []byte, index uint64) interface{} return nil } +func (n *nomadFSM) applyUpsertServiceRegistrations(msgType structs.MessageType, buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_service_registration_upsert"}, time.Now()) + var req structs.ServiceRegistrationUpsertRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.UpsertServiceRegistrations(msgType, index, req.Services); err != nil { + n.logger.Error("UpsertServiceRegistrations failed", "error", err) + return err + } + + return nil +} + +func (n *nomadFSM) applyDeleteServiceRegistrationByID(msgType structs.MessageType, buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_service_registration_delete_id"}, time.Now()) + var req structs.ServiceRegistrationDeleteByIDRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.DeleteServiceRegistrationByID(msgType, index, req.RequestNamespace(), req.ID); err != nil { + n.logger.Error("DeleteServiceRegistrationByID failed", "error", err) + return err + } + + return nil +} + +func (n *nomadFSM) applyDeleteServiceRegistrationByNodeID(msgType structs.MessageType, buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_service_registration_delete_node_id"}, time.Now()) + var req structs.ServiceRegistrationDeleteByNodeIDRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.DeleteServiceRegistrationByNodeID(msgType, index, req.NodeID); err != nil { + n.logger.Error("DeleteServiceRegistrationByNodeID failed", "error", err) + return err + } + + return nil +} + func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now()) // Register the nodes diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 42b3a7e25c75..f007d82b949a 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -3259,6 +3259,87 @@ func TestFSM_SnapshotRestore_Namespaces(t *testing.T) { } } +func TestFSM_UpsertServiceRegistrations(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + // Generate our test service registrations. + services := mock.ServiceRegistrations() + + // Build and apply our message. + req := structs.ServiceRegistrationUpsertRequest{Services: services} + buf, err := structs.Encode(structs.ServiceRegistrationUpsertRequestType, req) + assert.Nil(t, err) + assert.Nil(t, fsm.Apply(makeLog(buf))) + + // Check that both services are found within state. + ws := memdb.NewWatchSet() + out, err := fsm.State().GetServiceRegistrationByID(ws, services[0].Namespace, services[0].ID) + assert.Nil(t, err) + assert.NotNil(t, out) + + out, err = fsm.State().GetServiceRegistrationByID(ws, services[1].Namespace, services[1].ID) + assert.Nil(t, err) + assert.NotNil(t, out) +} + +func TestFSM_DeleteServiceRegistrationsByID(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + // Generate our test service registrations. + services := mock.ServiceRegistrations() + + // Upsert the services. + assert.NoError(t, fsm.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, uint64(10), services)) + + // Build and apply our message. + req := structs.ServiceRegistrationDeleteByIDRequest{ID: services[0].ID} + buf, err := structs.Encode(structs.ServiceRegistrationDeleteByIDRequestType, req) + assert.Nil(t, err) + assert.Nil(t, fsm.Apply(makeLog(buf))) + + // Check that the service has been deleted, whilst the other is still + // available. + ws := memdb.NewWatchSet() + out, err := fsm.State().GetServiceRegistrationByID(ws, services[0].Namespace, services[0].ID) + assert.Nil(t, err) + assert.Nil(t, out) + + out, err = fsm.State().GetServiceRegistrationByID(ws, services[1].Namespace, services[1].ID) + assert.Nil(t, err) + assert.NotNil(t, out) +} + +func TestFSM_DeleteServiceRegistrationsByNodeID(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + // Generate our test service registrations. Set them both to have the same + // node ID. + services := mock.ServiceRegistrations() + services[1].NodeID = services[0].NodeID + + // Upsert the services. + assert.NoError(t, fsm.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, uint64(10), services)) + + // Build and apply our message. + req := structs.ServiceRegistrationDeleteByNodeIDRequest{NodeID: services[0].NodeID} + buf, err := structs.Encode(structs.ServiceRegistrationDeleteByNodeIDRequestType, req) + assert.Nil(t, err) + assert.Nil(t, fsm.Apply(makeLog(buf))) + + // Check both services have been removed. + ws := memdb.NewWatchSet() + out, err := fsm.State().GetServiceRegistrationByID(ws, services[0].Namespace, services[0].ID) + assert.Nil(t, err) + assert.Nil(t, out) + + out, err = fsm.State().GetServiceRegistrationByID(ws, services[1].Namespace, services[1].ID) + assert.Nil(t, err) + assert.Nil(t, out) +} + func TestFSM_ACLEvents(t *testing.T) { t.Parallel() diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 418cbe1afa3f..a26600290bc8 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -2217,3 +2217,65 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest, }} return j.srv.blockingRPC(&opts) } + +// GetServiceRegistrations returns a list of service registrations which belong +// to the passed job ID. +func (j *Job) GetServiceRegistrations( + args *structs.JobServiceRegistrationsRequest, + reply *structs.JobServiceRegistrationsResponse) error { + + if done, err := j.srv.forward(structs.JobServiceRegistrationsRPCMethod, args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "job", "get_service_registrations"}, time.Now()) + + // If ACLs are enabled, ensure the caller has the read-job namespace + // capability. + aclObj, err := j.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } else if aclObj != nil { + if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) { + return structs.ErrPermissionDenied + } + } + + // Set up the blocking query. + return j.srv.blockingRPC(&blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, stateStore *state.StateStore) error { + + job, err := stateStore.JobByID(ws, args.RequestNamespace(), args.JobID) + if err != nil { + return err + } + + // Guard against the job not-existing. Do not create an empty list + // to allow the API to determine whether the job was found or not. + if job == nil { + return nil + } + + // Perform the state query to get an iterator. + iter, err := stateStore.GetServiceRegistrationsByJobID(ws, args.RequestNamespace(), args.JobID) + if err != nil { + return err + } + + // Set up our output after we have checked the error. + services := make([]*structs.ServiceRegistration, 0) + + // Iterate the iterator, appending all service registrations + // returned to the reply. + for raw := iter.Next(); raw != nil; raw = iter.Next() { + services = append(services, raw.(*structs.ServiceRegistration)) + } + reply.Services = services + + // Use the index table to populate the query meta as we have no way + // of tracking the max index on deletes. + return j.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta) + }, + }) +} diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 3850290fa4f7..e844578faa23 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -7740,3 +7740,280 @@ func TestJobEndpoint_GetScaleStatus_ACL(t *testing.T) { require.NotNil(validResp.JobScaleStatus) } } + +func TestJob_GetServiceRegistrations(t *testing.T) { + t.Parallel() + + // This function is a helper function to set up job and service which can + // be queried. + correctSetupFn := func(s *Server) (error, string, *structs.ServiceRegistration) { + // Generate an upsert a job. + job := mock.Job() + err := s.State().UpsertJob(structs.MsgTypeTestSetup, 10, job) + if err != nil { + return nil, "", nil + } + + // Generate services. Set the jobID on the first service so this + // matches the job now held in state. + services := mock.ServiceRegistrations() + services[0].JobID = job.ID + err = s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services) + + return err, job.ID, services[0] + } + + testCases := []struct { + serverFn func(t *testing.T) (*Server, *structs.ACLToken, func()) + testFn func(t *testing.T, s *Server, token *structs.ACLToken) + name string + }{ + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + err, jobID, service := correctSetupFn(s) + require.NoError(t, err) + + // Perform a lookup and test the response. + serviceRegReq := &structs.JobServiceRegistrationsRequest{ + JobID: jobID, + QueryOptions: structs.QueryOptions{ + Namespace: service.Namespace, + Region: s.Region(), + }, + } + var serviceRegResp structs.JobServiceRegistrationsResponse + err = msgpackrpc.CallWithCodec(codec, structs.JobServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.EqualValues(t, uint64(20), serviceRegResp.Index) + require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{service}) + }, + name: "ACLs disabled job found with regs", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert our services. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services)) + + // Perform a lookup on the first service using the job ID. This + // job does not exist within the Nomad state meaning the + // service is orphaned or the caller used an incorrect job ID. + serviceRegReq := &structs.JobServiceRegistrationsRequest{ + JobID: services[0].JobID, + QueryOptions: structs.QueryOptions{ + Namespace: services[0].Namespace, + Region: s.Region(), + }, + } + var serviceRegResp structs.JobServiceRegistrationsResponse + err := msgpackrpc.CallWithCodec(codec, structs.JobServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.Nil(t, serviceRegResp.Services) + }, + name: "ACLs disabled job not found", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate an upsert a job. + job := mock.Job() + require.NoError(t, s.State().UpsertJob(structs.MsgTypeTestSetup, 10, job)) + + // Perform a lookup and test the response. + serviceRegReq := &structs.JobServiceRegistrationsRequest{ + JobID: job.ID, + QueryOptions: structs.QueryOptions{ + Namespace: job.Namespace, + Region: s.Region(), + }, + } + var serviceRegResp structs.JobServiceRegistrationsResponse + err := msgpackrpc.CallWithCodec(codec, structs.JobServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{}) + }, + name: "ACLs disabled job found without regs", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + err, jobID, service := correctSetupFn(s) + require.NoError(t, err) + + // Perform a lookup and test the response. + serviceRegReq := &structs.JobServiceRegistrationsRequest{ + JobID: jobID, + QueryOptions: structs.QueryOptions{ + Namespace: service.Namespace, + Region: s.Region(), + AuthToken: token.SecretID, + }, + } + var serviceRegResp structs.JobServiceRegistrationsResponse + err = msgpackrpc.CallWithCodec(codec, structs.JobServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{service}) + }, + name: "ACLs enabled use management token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + err, jobID, service := correctSetupFn(s) + require.NoError(t, err) + + // Create and policy and grab the auth token. + authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-node-get-service-reg", + mock.NamespacePolicy(service.Namespace, "", []string{acl.NamespaceCapabilityReadJob})).SecretID + + // Perform a lookup and test the response. + serviceRegReq := &structs.JobServiceRegistrationsRequest{ + JobID: jobID, + QueryOptions: structs.QueryOptions{ + Namespace: service.Namespace, + Region: s.Region(), + AuthToken: authToken, + }, + } + var serviceRegResp structs.JobServiceRegistrationsResponse + err = msgpackrpc.CallWithCodec(codec, structs.JobServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{service}) + }, + name: "ACLs enabled use read-job namespace capability token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + err, jobID, service := correctSetupFn(s) + require.NoError(t, err) + + // Create and policy and grab the auth token. + authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-node-get-service-reg", + mock.NamespacePolicy(service.Namespace, "read", nil)).SecretID + + // Perform a lookup and test the response. + serviceRegReq := &structs.JobServiceRegistrationsRequest{ + JobID: jobID, + QueryOptions: structs.QueryOptions{ + Namespace: service.Namespace, + Region: s.Region(), + AuthToken: authToken, + }, + } + var serviceRegResp structs.JobServiceRegistrationsResponse + err = msgpackrpc.CallWithCodec(codec, structs.JobServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{service}) + }, + name: "ACLs enabled use read namespace policy token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + err, jobID, service := correctSetupFn(s) + require.NoError(t, err) + + // Create and policy and grab the auth token. + authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-node-get-service-reg", + mock.NamespacePolicy("ohno", "read", nil)).SecretID + + // Perform a lookup and test the response. + serviceRegReq := &structs.JobServiceRegistrationsRequest{ + JobID: jobID, + QueryOptions: structs.QueryOptions{ + Namespace: service.Namespace, + Region: s.Region(), + AuthToken: authToken, + }, + } + var serviceRegResp structs.JobServiceRegistrationsResponse + err = msgpackrpc.CallWithCodec(codec, structs.JobServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.Error(t, err) + require.Contains(t, err.Error(), "Permission denied") + require.Empty(t, serviceRegResp.Services) + }, + name: "ACLs enabled use read incorrect namespace policy token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + err, jobID, service := correctSetupFn(s) + require.NoError(t, err) + + // Create and policy and grab the auth token. + authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-node-get-service-reg", + mock.NamespacePolicy(service.Namespace, "", []string{acl.NamespaceCapabilityReadScalingPolicy})).SecretID + + // Perform a lookup and test the response. + serviceRegReq := &structs.JobServiceRegistrationsRequest{ + JobID: jobID, + QueryOptions: structs.QueryOptions{ + Namespace: service.Namespace, + Region: s.Region(), + AuthToken: authToken, + }, + } + var serviceRegResp structs.JobServiceRegistrationsResponse + err = msgpackrpc.CallWithCodec(codec, structs.JobServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp) + require.Error(t, err) + require.Contains(t, err.Error(), "Permission denied") + require.Empty(t, serviceRegResp.Services) + }, + name: "ACLs enabled use incorrect capability", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + server, aclToken, cleanup := tc.serverFn(t) + defer cleanup() + tc.testFn(t, server, aclToken) + }) + } +} diff --git a/nomad/rpc_test.go b/nomad/rpc_test.go index bd738f2793a3..98912bfe4944 100644 --- a/nomad/rpc_test.go +++ b/nomad/rpc_test.go @@ -1156,6 +1156,9 @@ func TestRPC_TLS_Enforcement_RPC(t *testing.T) { "Node.UpdateAlloc": &structs.AllocUpdateRequest{ WriteRequest: structs.WriteRequest{Region: "global"}, }, + "ServiceRegistration.Upsert": &structs.ServiceRegistrationUpsertRequest{ + WriteRequest: structs.WriteRequest{Region: "global"}, + }, } // When VerifyServerHostname is enabled: diff --git a/nomad/server.go b/nomad/server.go index 5e3d2eb51ac2..df75baa27d63 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -264,22 +264,23 @@ type Server struct { // Holds the RPC endpoints type endpoints struct { - Status *Status - Node *Node - Job *Job - CSIVolume *CSIVolume - CSIPlugin *CSIPlugin - Deployment *Deployment - Region *Region - Search *Search - Periodic *Periodic - System *System - Operator *Operator - ACL *ACL - Scaling *Scaling - Enterprise *EnterpriseEndpoints - Event *Event - Namespace *Namespace + Status *Status + Node *Node + Job *Job + CSIVolume *CSIVolume + CSIPlugin *CSIPlugin + Deployment *Deployment + Region *Region + Search *Search + Periodic *Periodic + System *System + Operator *Operator + ACL *ACL + Scaling *Scaling + Enterprise *EnterpriseEndpoints + Event *Event + Namespace *Namespace + ServiceRegistration *ServiceRegistration // Client endpoints ClientStats *ClientStats @@ -1167,6 +1168,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { // register them as static. s.staticEndpoints.Deployment = &Deployment{srv: s, logger: s.logger.Named("deployment")} s.staticEndpoints.Node = &Node{srv: s, logger: s.logger.Named("client")} + s.staticEndpoints.ServiceRegistration = &ServiceRegistration{srv: s} // Client endpoints s.staticEndpoints.ClientStats = &ClientStats{srv: s, logger: s.logger.Named("client_stats")} @@ -1212,6 +1214,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { eval := &Eval{srv: s, ctx: ctx, logger: s.logger.Named("eval")} node := &Node{srv: s, ctx: ctx, logger: s.logger.Named("client")} plan := &Plan{srv: s, ctx: ctx, logger: s.logger.Named("plan")} + serviceReg := &ServiceRegistration{srv: s, ctx: ctx} // Register the dynamic endpoints server.Register(alloc) @@ -1219,6 +1222,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { server.Register(eval) server.Register(node) server.Register(plan) + _ = server.Register(serviceReg) } // setupRaft is used to setup and initialize Raft @@ -1885,6 +1889,32 @@ func (s *Server) EmitRaftStats(period time.Duration, stopCh <-chan struct{}) { } } +// setReplyQueryMeta is an RPC helper function to properly populate the query +// meta for a read response. It populates the index using a floored value +// obtained from the index table as well as leader and last contact +// information. +// +// If the passed state.StateStore is nil, a new handle is obtained. +func (s *Server) setReplyQueryMeta(stateStore *state.StateStore, table string, reply *structs.QueryMeta) error { + + // Protect against an empty stateStore object to avoid panic. + if stateStore == nil { + stateStore = s.fsm.State() + } + + // Get the index from the index table and ensure the value is floored to at + // least one. + index, err := stateStore.Index(table) + if err != nil { + return err + } + reply.Index = helper.Uint64Max(1, index) + + // Set the query response. + s.setQueryMeta(reply) + return nil +} + // Region returns the region of the server func (s *Server) Region() string { return s.config.Region diff --git a/nomad/service_registration_endpoint.go b/nomad/service_registration_endpoint.go new file mode 100644 index 000000000000..97021eba7bb6 --- /dev/null +++ b/nomad/service_registration_endpoint.go @@ -0,0 +1,417 @@ +package nomad + +import ( + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +// ServiceRegistration encapsulates the service registrations RPC endpoint +// which is callable via the ServiceRegistration RPCs and externally via the +// "/v1/service{s}" HTTP API. +type ServiceRegistration struct { + srv *Server + + // ctx provides context regarding the underlying connection, so we can + // perform TLS certificate validation on internal only endpoints. + ctx *RPCContext +} + +// Upsert creates or updates service registrations held within Nomad. This RPC +// is only callable by Nomad nodes. +func (s *ServiceRegistration) Upsert( + args *structs.ServiceRegistrationUpsertRequest, + reply *structs.ServiceRegistrationUpsertResponse) error { + + // Ensure the connection was initiated by a client if TLS is used. + if err := validateTLSCertificateLevel(s.srv, s.ctx, tlsCertificateLevelClient); err != nil { + return err + } + + if done, err := s.srv.forward(structs.ServiceRegistrationUpsertRPCMethod, args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "service_registration", "upsert"}, time.Now()) + + // This endpoint is only callable by nodes in the cluster. Therefore, + // perform a node lookup using the secret ID to confirm the caller is a + // known node. + node, err := s.srv.fsm.State().NodeBySecretID(nil, args.AuthToken) + if err != nil { + return err + } + if node == nil { + return structs.ErrTokenNotFound + } + + // Use a multierror, so we can capture all validation errors and pass this + // back so fixing in a single swoop. + var mErr multierror.Error + + // Iterate the services and validate them. Any error results in the call + // failing. + for _, service := range args.Services { + if err := service.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + if err := mErr.ErrorOrNil(); err != nil { + return err + } + + // Update via Raft. + out, index, err := s.srv.raftApply(structs.ServiceRegistrationUpsertRequestType, args) + if err != nil { + return err + } + + // Check if the FSM response, which is an interface, contains an error. + if err, ok := out.(error); ok && err != nil { + return err + } + + // Update the index. There is no need to floor this as we are writing to + // state and therefore will get a non-zero index response. + reply.Index = index + return nil +} + +// DeleteByID removes a single service registration, as specified by its ID +// from Nomad. This is typically called by Nomad nodes, however, in extreme +// situations can be used via the CLI and API by operators. +func (s *ServiceRegistration) DeleteByID( + args *structs.ServiceRegistrationDeleteByIDRequest, + reply *structs.ServiceRegistrationDeleteByIDResponse) error { + + if done, err := s.srv.forward(structs.ServiceRegistrationDeleteByIDRPCMethod, args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "service_registration", "delete_id"}, time.Now()) + + // Perform the ACL token resolution. + aclObj, err := s.srv.ResolveToken(args.AuthToken) + + switch err { + case nil: + // If ACLs are enabled, ensure the caller has the submit-job namespace + // capability. + if aclObj != nil { + hasSubmitJob := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) + if !hasSubmitJob { + return structs.ErrPermissionDenied + } + } + default: + // This endpoint is generally called by Nomad nodes, so we want to + // perform this check, unless the token resolution gave us a terminal + // error. + if err != structs.ErrTokenNotFound { + return err + } + + // Attempt to lookup AuthToken as a Node.SecretID and return any error + // wrapped along with the original. + node, stateErr := s.srv.fsm.State().NodeBySecretID(nil, args.AuthToken) + if stateErr != nil { + var mErr multierror.Error + mErr.Errors = append(mErr.Errors, err, stateErr) + return mErr.ErrorOrNil() + } + + // At this point, we do not have a valid ACL token, nor are we being + // called, or able to confirm via the state store, by a node. + if node == nil { + return structs.ErrTokenNotFound + } + } + + // Update via Raft. + out, index, err := s.srv.raftApply(structs.ServiceRegistrationDeleteByIDRequestType, args) + if err != nil { + return err + } + + // Check if the FSM response, which is an interface, contains an error. + if err, ok := out.(error); ok && err != nil { + return err + } + + // Update the index. There is no need to floor this as we are writing to + // state and therefore will get a non-zero index response. + reply.Index = index + return nil +} + +// List is used to list service registration held within state. It supports +// single and wildcard namespace listings. +func (s *ServiceRegistration) List( + args *structs.ServiceRegistrationListRequest, + reply *structs.ServiceRegistrationListResponse) error { + + if done, err := s.srv.forward(structs.ServiceRegistrationListRPCMethod, args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "service_registration", "list"}, time.Now()) + + // If the caller has requested to list services across all namespaces, use + // the custom function to perform this. + if args.RequestNamespace() == structs.AllNamespacesSentinel { + return s.listAllServiceRegistrations(args, reply) + } + + // If ACLs are enabled, ensure the caller has the read-job namespace + // capability. + if aclObj, err := s.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil { + if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) { + return structs.ErrPermissionDenied + } + } + + // Set up and return the blocking query. + return s.srv.blockingRPC(&blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, stateStore *state.StateStore) error { + + // Perform the state query to get an iterator. + iter, err := stateStore.GetServiceRegistrationsByNamespace(ws, args.RequestNamespace()) + if err != nil { + return err + } + + // Track the unique tags found per service registration name. + serviceTags := make(map[string]map[string]struct{}) + + for raw := iter.Next(); raw != nil; raw = iter.Next() { + + serviceReg := raw.(*structs.ServiceRegistration) + + // Identify and add any tags for the current service being + // iterated into the map. If the tag has already been seen for + // the same service, it will be overwritten ensuring no + // duplicates. + tags, ok := serviceTags[serviceReg.ServiceName] + if !ok { + serviceTags[serviceReg.ServiceName] = make(map[string]struct{}) + tags = serviceTags[serviceReg.ServiceName] + } + for _, tag := range serviceReg.Tags { + tags[tag] = struct{}{} + } + } + + var serviceList []*structs.ServiceRegistrationStub + + // Iterate the serviceTags map and populate our output result. This + // endpoint handles a single namespace, so we do not need to + // account for multiple. + for service, tags := range serviceTags { + + serviceStub := structs.ServiceRegistrationStub{ + ServiceName: service, + Tags: make([]string, 0, len(tags)), + } + for tag := range tags { + serviceStub.Tags = append(serviceStub.Tags, tag) + } + + serviceList = append(serviceList, &serviceStub) + } + + // Correctly handle situations where a namespace was passed that + // either does not contain service registrations, or might not even + // exist. + if len(serviceList) > 0 { + reply.Services = []*structs.ServiceRegistrationListStub{ + { + Namespace: args.RequestNamespace(), + Services: serviceList, + }, + } + } else { + reply.Services = make([]*structs.ServiceRegistrationListStub, 0) + } + + // Use the index table to populate the query meta as we have no way + // of tracking the max index on deletes. + return s.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta) + }, + }) +} + +// listAllServiceRegistrations is used to list service registration held within +// state where the caller has used the namespace wildcard identifier. +func (s *ServiceRegistration) listAllServiceRegistrations( + args *structs.ServiceRegistrationListRequest, + reply *structs.ServiceRegistrationListResponse) error { + + // Perform token resolution. The request already goes through forwarding + // and metrics setup before being called. + aclObj, err := s.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } + + // allowFunc checks whether the caller has the read-job capability on the + // passed namespace. + allowFunc := func(ns string) bool { + return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob) + } + + // Set up and return the blocking query. + return s.srv.blockingRPC(&blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, stateStore *state.StateStore) error { + + // Identify which namespaces the caller has access to. If they do + // not have access to any, send them an empty response. Otherwise, + // handle any error in a traditional manner. + allowedNSes, err := allowedNSes(aclObj, stateStore, allowFunc) + switch err { + case structs.ErrPermissionDenied: + reply.Services = make([]*structs.ServiceRegistrationListStub, 0) + return nil + case nil: + // Fallthrough. + default: + return err + } + + // Get all the service registrations stored within state. + iter, err := stateStore.GetServiceRegistrations(ws) + if err != nil { + return err + } + + // Track the unique tags found per namespace per service + // registration name. + namespacedServiceTags := make(map[string]map[string]map[string]struct{}) + + // Iterate all service registrations. + for raw := iter.Next(); raw != nil; raw = iter.Next() { + + // We need to assert the type here in order to check the + // namespace. + serviceReg := raw.(*structs.ServiceRegistration) + + // Check whether the service registration is within a namespace + // the caller is permitted to view. nil allowedNSes means the + // caller can view all namespaces. + if allowedNSes != nil && !allowedNSes[serviceReg.Namespace] { + continue + } + + // Identify and add any tags for the current namespaced service + // being iterated into the map. If the tag has already been + // seen for the same service, it will be overwritten ensuring + // no duplicates. + namespace, ok := namespacedServiceTags[serviceReg.Namespace] + if !ok { + namespacedServiceTags[serviceReg.Namespace] = make(map[string]map[string]struct{}) + namespace = namespacedServiceTags[serviceReg.Namespace] + } + tags, ok := namespace[serviceReg.ServiceName] + if !ok { + namespace[serviceReg.ServiceName] = make(map[string]struct{}) + tags = namespace[serviceReg.ServiceName] + } + for _, tag := range serviceReg.Tags { + tags[tag] = struct{}{} + } + } + + // Set up our output object. Start with zero size but allocate the + // know length as we wil need to append whilst avoid slice growing. + servicesOutput := make([]*structs.ServiceRegistrationListStub, 0, len(namespacedServiceTags)) + + for ns, serviceTags := range namespacedServiceTags { + + var serviceList []*structs.ServiceRegistrationStub + + // Iterate the serviceTags map and populate our output result. + for service, tags := range serviceTags { + + serviceStub := structs.ServiceRegistrationStub{ + ServiceName: service, + Tags: make([]string, 0, len(tags)), + } + for tag := range tags { + serviceStub.Tags = append(serviceStub.Tags, tag) + } + + serviceList = append(serviceList, &serviceStub) + } + + servicesOutput = append(servicesOutput, &structs.ServiceRegistrationListStub{ + Namespace: ns, + Services: serviceList, + }) + } + + // Add the output to the reply object. + reply.Services = servicesOutput + + // Use the index table to populate the query meta as we have no way + // of tracking the max index on deletes. + return s.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta) + }, + }) +} + +// GetService is used to get all services registrations corresponding to a +// single name. +func (s *ServiceRegistration) GetService( + args *structs.ServiceRegistrationByNameRequest, + reply *structs.ServiceRegistrationByNameResponse) error { + + if done, err := s.srv.forward(structs.ServiceRegistrationGetServiceRPCMethod, args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "service_registration", "get_service"}, time.Now()) + + // If ACLs are enabled, ensure the caller has the read-job namespace + // capability. + if aclObj, err := s.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil { + if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) { + return structs.ErrPermissionDenied + } + } + + // Set up the blocking query. + return s.srv.blockingRPC(&blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, stateStore *state.StateStore) error { + + // Perform the state query to get an iterator. + iter, err := stateStore.GetServiceRegistrationByName(ws, args.RequestNamespace(), args.ServiceName) + if err != nil { + return err + } + + // Set up our output after we have checked the error. + var services []*structs.ServiceRegistration + + // Iterate the iterator, appending all service registrations + // returned to the reply. + for raw := iter.Next(); raw != nil; raw = iter.Next() { + services = append(services, raw.(*structs.ServiceRegistration)) + } + reply.Services = services + + // Use the index table to populate the query meta as we have no way + // of tracking the max index on deletes. + return s.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta) + }, + }) +} diff --git a/nomad/service_registration_endpoint_test.go b/nomad/service_registration_endpoint_test.go new file mode 100644 index 000000000000..cff3473119d5 --- /dev/null +++ b/nomad/service_registration_endpoint_test.go @@ -0,0 +1,974 @@ +package nomad + +import ( + "testing" + + "github.com/hashicorp/go-memdb" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +func TestServiceRegistration_Upsert(t *testing.T) { + t.Parallel() + + testCases := []struct { + serverFn func(t *testing.T) (*Server, *structs.ACLToken, func()) + testFn func(t *testing.T, s *Server, token *structs.ACLToken) + name string + }{ + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations and ensure + // they are in the same namespace. + services := mock.ServiceRegistrations() + services[1].Namespace = services[0].Namespace + + // Attempt to upsert without a token. + serviceRegReq := &structs.ServiceRegistrationUpsertRequest{ + Services: services, + WriteRequest: structs.WriteRequest{ + Region: DefaultRegion, + Namespace: services[0].Namespace, + }, + } + var serviceRegResp structs.ServiceRegistrationUpsertResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationUpsertRPCMethod, serviceRegReq, &serviceRegResp) + require.Error(t, err) + require.Contains(t, err.Error(), "node lookup by SecretID failed") + + // Generate a node and retry the upsert. + node := mock.Node() + require.NoError(t, s.State().UpsertNode(structs.MsgTypeTestSetup, 10, node)) + + ws := memdb.NewWatchSet() + node, err = s.State().NodeByID(ws, node.ID) + require.NoError(t, err) + require.NotNil(t, node) + + serviceRegReq.WriteRequest.AuthToken = node.SecretID + err = msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationUpsertRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.Greater(t, serviceRegResp.Index, uint64(1)) + }, + name: "ACLs disabled without node secret", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations and ensure + // they are in the same namespace. + services := mock.ServiceRegistrations() + services[1].Namespace = services[0].Namespace + + // Generate a node. + node := mock.Node() + require.NoError(t, s.State().UpsertNode(structs.MsgTypeTestSetup, 10, node)) + + ws := memdb.NewWatchSet() + node, err := s.State().NodeByID(ws, node.ID) + require.NoError(t, err) + require.NotNil(t, node) + + serviceRegReq := &structs.ServiceRegistrationUpsertRequest{ + Services: services, + WriteRequest: structs.WriteRequest{ + Region: DefaultRegion, + Namespace: services[0].Namespace, + AuthToken: node.SecretID, + }, + } + var serviceRegResp structs.ServiceRegistrationUpsertResponse + err = msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationUpsertRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.Greater(t, serviceRegResp.Index, uint64(1)) + }, + name: "ACLs disabled with node secret", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations and ensure + // they are in the same namespace. + services := mock.ServiceRegistrations() + services[1].Namespace = services[0].Namespace + + // Attempt to upsert without a token. + serviceRegReq := &structs.ServiceRegistrationUpsertRequest{ + Services: services, + WriteRequest: structs.WriteRequest{ + Region: DefaultRegion, + Namespace: services[0].Namespace, + }, + } + var serviceRegResp structs.ServiceRegistrationUpsertResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationUpsertRPCMethod, serviceRegReq, &serviceRegResp) + require.Error(t, err) + require.Contains(t, err.Error(), "node lookup by SecretID failed") + + // Generate a node and retry the upsert. + node := mock.Node() + require.NoError(t, s.State().UpsertNode(structs.MsgTypeTestSetup, 10, node)) + + ws := memdb.NewWatchSet() + node, err = s.State().NodeByID(ws, node.ID) + require.NoError(t, err) + require.NotNil(t, node) + + serviceRegReq.WriteRequest.AuthToken = node.SecretID + err = msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationUpsertRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.Greater(t, serviceRegResp.Index, uint64(1)) + }, + name: "ACLs enabled without node secret", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations and ensure + // they are in the same namespace. + services := mock.ServiceRegistrations() + services[1].Namespace = services[0].Namespace + + // Generate a node. + node := mock.Node() + require.NoError(t, s.State().UpsertNode(structs.MsgTypeTestSetup, 10, node)) + + ws := memdb.NewWatchSet() + node, err := s.State().NodeByID(ws, node.ID) + require.NoError(t, err) + require.NotNil(t, node) + + serviceRegReq := &structs.ServiceRegistrationUpsertRequest{ + Services: services, + WriteRequest: structs.WriteRequest{ + Region: DefaultRegion, + Namespace: services[0].Namespace, + AuthToken: node.SecretID, + }, + } + var serviceRegResp structs.ServiceRegistrationUpsertResponse + err = msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationUpsertRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.Greater(t, serviceRegResp.Index, uint64(1)) + }, + name: "ACLs enabled with node secret", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + server, aclToken, cleanup := tc.serverFn(t) + defer cleanup() + tc.testFn(t, server, aclToken) + }) + } +} + +func TestServiceRegistration_DeleteByID(t *testing.T) { + t.Parallel() + + testCases := []struct { + serverFn func(t *testing.T) (*Server, *structs.ACLToken, func()) + testFn func(t *testing.T, s *Server, token *structs.ACLToken) + name string + }{ + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Attempt to delete a service registration that does not + // exist. + serviceRegReq := &structs.ServiceRegistrationDeleteByIDRequest{ + ID: "this-is-not-the-service-you're-looking-for", + WriteRequest: structs.WriteRequest{ + Region: DefaultRegion, + Namespace: "default", + }, + } + + var serviceRegResp structs.ServiceRegistrationDeleteByIDResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationDeleteByIDRPCMethod, serviceRegReq, &serviceRegResp) + require.Error(t, err) + require.Contains(t, err.Error(), "service registration not found") + }, + name: "ACLs disabled unknown service", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + // Try and delete one of the services that exist. + serviceRegReq := &structs.ServiceRegistrationDeleteByIDRequest{ + ID: services[0].ID, + WriteRequest: structs.WriteRequest{ + Region: DefaultRegion, + Namespace: services[0].Namespace, + }, + } + + var serviceRegResp structs.ServiceRegistrationDeleteByIDResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationDeleteByIDRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + }, + name: "ACLs disabled known service", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + // Try and delete one of the services that exist. + serviceRegReq := &structs.ServiceRegistrationDeleteByIDRequest{ + ID: services[0].ID, + WriteRequest: structs.WriteRequest{ + Region: DefaultRegion, + Namespace: services[0].Namespace, + AuthToken: token.SecretID, + }, + } + + var serviceRegResp structs.ServiceRegistrationDeleteByIDResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationDeleteByIDRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + }, + name: "ACLs enabled known service with management token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + // Try and delete one of the services that exist but don't set + // an auth token. + serviceRegReq := &structs.ServiceRegistrationDeleteByIDRequest{ + ID: services[0].ID, + WriteRequest: structs.WriteRequest{ + Region: DefaultRegion, + Namespace: services[0].Namespace, + }, + } + + var serviceRegResp structs.ServiceRegistrationDeleteByIDResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationDeleteByIDRPCMethod, serviceRegReq, &serviceRegResp) + require.Error(t, err) + require.Contains(t, err.Error(), "Permission denied") + }, + name: "ACLs enabled known service without token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + // Create a token using submit-job capability. + authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-service-reg-delete", + mock.NamespacePolicy(services[0].Namespace, "", []string{acl.NamespaceCapabilitySubmitJob})).SecretID + + // Try and delete one of the services that exist but don't set + // an auth token. + serviceRegReq := &structs.ServiceRegistrationDeleteByIDRequest{ + ID: services[0].ID, + WriteRequest: structs.WriteRequest{ + Region: DefaultRegion, + Namespace: services[0].Namespace, + AuthToken: authToken, + }, + } + + var serviceRegResp structs.ServiceRegistrationDeleteByIDResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationDeleteByIDRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + }, + name: "ACLs enabled known service with submit-job namespace token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + // Create a token using submit-job capability. + authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-service-reg-delete", + mock.NamespacePolicy(services[0].Namespace, "", []string{acl.NamespaceCapabilityReadJob})).SecretID + + // Try and delete one of the services that exist but don't set + // an auth token. + serviceRegReq := &structs.ServiceRegistrationDeleteByIDRequest{ + ID: services[0].ID, + WriteRequest: structs.WriteRequest{ + Region: DefaultRegion, + Namespace: services[0].Namespace, + AuthToken: authToken, + }, + } + + var serviceRegResp structs.ServiceRegistrationDeleteByIDResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationDeleteByIDRPCMethod, serviceRegReq, &serviceRegResp) + require.Error(t, err) + require.Contains(t, err.Error(), "Permission denied") + }, + name: "ACLs enabled known service with read-job namespace token", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + server, aclToken, cleanup := tc.serverFn(t) + defer cleanup() + tc.testFn(t, server, aclToken) + }) + } +} + +func TestServiceRegistration_List(t *testing.T) { + t.Parallel() + + testCases := []struct { + serverFn func(t *testing.T) (*Server, *structs.ACLToken, func()) + testFn func(t *testing.T, s *Server, token *structs.ACLToken) + name string + }{ + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + // Test a request without setting an ACL token. + serviceRegReq := &structs.ServiceRegistrationListRequest{ + QueryOptions: structs.QueryOptions{ + Namespace: structs.AllNamespacesSentinel, + Region: DefaultRegion, + }, + } + var serviceRegResp structs.ServiceRegistrationListResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationListRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, []*structs.ServiceRegistrationListStub{ + { + Namespace: "default", + Services: []*structs.ServiceRegistrationStub{ + { + ServiceName: "example-cache", + Tags: []string{"foo"}, + }, + }}, + { + Namespace: "platform", + Services: []*structs.ServiceRegistrationStub{ + { + ServiceName: "countdash-api", + Tags: []string{"bar"}, + }, + }}, + }, serviceRegResp.Services) + }, + name: "ACLs disabled wildcard ns", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + // Test a request without setting an ACL token. + serviceRegReq := &structs.ServiceRegistrationListRequest{ + QueryOptions: structs.QueryOptions{ + Namespace: "platform", + Region: DefaultRegion, + }, + } + var serviceRegResp structs.ServiceRegistrationListResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationListRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, []*structs.ServiceRegistrationListStub{ + { + Namespace: "platform", + Services: []*structs.ServiceRegistrationStub{ + { + ServiceName: "countdash-api", + Tags: []string{"bar"}, + }, + }, + }, + }, serviceRegResp.Services) + }, + name: "ACLs disabled platform ns", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Test a request without setting an ACL token. + serviceRegReq := &structs.ServiceRegistrationListRequest{ + QueryOptions: structs.QueryOptions{ + Namespace: "platform", + Region: DefaultRegion, + }, + } + var serviceRegResp structs.ServiceRegistrationListResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationListRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, []*structs.ServiceRegistrationListStub{}, serviceRegResp.Services) + }, + name: "ACLs disabled no services", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + // Test a request without setting an ACL token. + serviceRegReq := &structs.ServiceRegistrationListRequest{ + QueryOptions: structs.QueryOptions{ + Namespace: structs.AllNamespacesSentinel, + Region: DefaultRegion, + }, + } + var serviceRegResp structs.ServiceRegistrationListResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationListRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, []*structs.ServiceRegistrationListStub{}, serviceRegResp.Services) + }, + name: "ACLs enabled wildcard ns without token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + // Test a request without setting an ACL token. + serviceRegReq := &structs.ServiceRegistrationListRequest{ + QueryOptions: structs.QueryOptions{ + Namespace: "default", + Region: DefaultRegion, + }, + } + var serviceRegResp structs.ServiceRegistrationListResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationListRPCMethod, serviceRegReq, &serviceRegResp) + require.Error(t, err) + require.Contains(t, err.Error(), "Permission denied") + }, + name: "ACLs enabled default ns without token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + // Test a request without setting an ACL token. + serviceRegReq := &structs.ServiceRegistrationListRequest{ + QueryOptions: structs.QueryOptions{ + Namespace: structs.AllNamespacesSentinel, + Region: DefaultRegion, + AuthToken: token.SecretID, + }, + } + var serviceRegResp structs.ServiceRegistrationListResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationListRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, []*structs.ServiceRegistrationListStub{ + { + Namespace: "default", + Services: []*structs.ServiceRegistrationStub{ + { + ServiceName: "example-cache", + Tags: []string{"foo"}, + }, + }}, + { + Namespace: "platform", + Services: []*structs.ServiceRegistrationStub{ + { + ServiceName: "countdash-api", + Tags: []string{"bar"}, + }, + }}, + }, serviceRegResp.Services) + }, + name: "ACLs enabled wildcard with management token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + // Test a request without setting an ACL token. + serviceRegReq := &structs.ServiceRegistrationListRequest{ + QueryOptions: structs.QueryOptions{ + Namespace: "default", + Region: DefaultRegion, + AuthToken: token.SecretID, + }, + } + var serviceRegResp structs.ServiceRegistrationListResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationListRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, []*structs.ServiceRegistrationListStub{ + { + Namespace: "default", + Services: []*structs.ServiceRegistrationStub{ + { + ServiceName: "example-cache", + Tags: []string{"foo"}, + }, + }}, + }, serviceRegResp.Services) + }, + name: "ACLs enabled default ns with management token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Create a policy and grab the token which has the read-job + // capability on the platform namespace. + customToken := mock.CreatePolicyAndToken(t, s.State(), 5, "test-valid-autoscaler", + mock.NamespacePolicy("platform", "", []string{acl.NamespaceCapabilityReadJob})).SecretID + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + // Test a request without setting an ACL token. + serviceRegReq := &structs.ServiceRegistrationListRequest{ + QueryOptions: structs.QueryOptions{ + Namespace: "platform", + Region: DefaultRegion, + AuthToken: customToken, + }, + } + var serviceRegResp structs.ServiceRegistrationListResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationListRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, []*structs.ServiceRegistrationListStub{ + { + Namespace: "platform", + Services: []*structs.ServiceRegistrationStub{ + { + ServiceName: "countdash-api", + Tags: []string{"bar"}, + }, + }}, + }, serviceRegResp.Services) + }, + name: "ACLs enabled with read-job policy token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Create a namespace as this is needed when using an ACL like + // we do in this test. + ns := &structs.Namespace{ + Name: "platform", + Description: "test namespace", + CreateIndex: 5, + ModifyIndex: 5, + } + ns.SetHash() + require.NoError(t, s.State().UpsertNamespaces(5, []*structs.Namespace{ns})) + + // Create a policy and grab the token which has the read-job + // capability on the platform namespace. + customToken := mock.CreatePolicyAndToken(t, s.State(), 10, "test-valid-autoscaler", + mock.NamespacePolicy("platform", "", []string{acl.NamespaceCapabilityReadJob})).SecretID + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services)) + + // Test a request without setting an ACL token. + serviceRegReq := &structs.ServiceRegistrationListRequest{ + QueryOptions: structs.QueryOptions{ + Namespace: structs.AllNamespacesSentinel, + Region: DefaultRegion, + AuthToken: customToken, + }, + } + var serviceRegResp structs.ServiceRegistrationListResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationListRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, []*structs.ServiceRegistrationListStub{ + { + Namespace: "platform", + Services: []*structs.ServiceRegistrationStub{ + { + ServiceName: "countdash-api", + Tags: []string{"bar"}, + }, + }}, + }, serviceRegResp.Services) + }, + name: "ACLs enabled wildcard ns with restricted token", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Create a namespace as this is needed when using an ACL like + // we do in this test. + ns := &structs.Namespace{ + Name: "platform", + Description: "test namespace", + CreateIndex: 5, + ModifyIndex: 5, + } + ns.SetHash() + require.NoError(t, s.State().UpsertNamespaces(5, []*structs.Namespace{ns})) + + // Create a policy and grab the token which has the read policy + // on the platform namespace. + customToken := mock.CreatePolicyAndToken(t, s.State(), 10, "test-valid-autoscaler", + mock.NamespacePolicy("platform", "read", nil)).SecretID + + // Generate and upsert some service registrations. + services := mock.ServiceRegistrations() + require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services)) + + // Test a request without setting an ACL token. + serviceRegReq := &structs.ServiceRegistrationListRequest{ + QueryOptions: structs.QueryOptions{ + Namespace: structs.AllNamespacesSentinel, + Region: DefaultRegion, + AuthToken: customToken, + }, + } + var serviceRegResp structs.ServiceRegistrationListResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationListRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, []*structs.ServiceRegistrationListStub{ + { + Namespace: "platform", + Services: []*structs.ServiceRegistrationStub{ + { + ServiceName: "countdash-api", + Tags: []string{"bar"}, + }, + }}, + }, serviceRegResp.Services) + }, + name: "ACLs enabled with read namespace policy token", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + server, aclToken, cleanup := tc.serverFn(t) + defer cleanup() + tc.testFn(t, server, aclToken) + }) + } +} + +func TestServiceRegistration_GetService(t *testing.T) { + t.Parallel() + + testCases := []struct { + serverFn func(t *testing.T) (*Server, *structs.ACLToken, func()) + testFn func(t *testing.T, s *Server, token *structs.ACLToken) + name string + }{ + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate mock services then upsert them individually using different indexes. + services := mock.ServiceRegistrations() + + require.NoError(t, s.fsm.State().UpsertServiceRegistrations( + structs.MsgTypeTestSetup, 10, []*structs.ServiceRegistration{services[0]})) + + require.NoError(t, s.fsm.State().UpsertServiceRegistrations( + structs.MsgTypeTestSetup, 20, []*structs.ServiceRegistration{services[1]})) + + // Lookup the first registration. + serviceRegReq := &structs.ServiceRegistrationByNameRequest{ + ServiceName: services[0].ServiceName, + QueryOptions: structs.QueryOptions{ + Namespace: services[0].Namespace, + Region: s.Region(), + }, + } + var serviceRegResp structs.ServiceRegistrationByNameResponse + err := msgpackrpc.CallWithCodec(codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.Equal(t, uint64(10), serviceRegResp.Services[0].CreateIndex) + require.Equal(t, uint64(20), serviceRegResp.Index) + require.Len(t, serviceRegResp.Services, 1) + + // Lookup the second registration. + serviceRegReq2 := &structs.ServiceRegistrationByNameRequest{ + ServiceName: services[1].ServiceName, + QueryOptions: structs.QueryOptions{ + Namespace: services[1].Namespace, + Region: s.Region(), + }, + } + var serviceRegResp2 structs.ServiceRegistrationByNameResponse + err = msgpackrpc.CallWithCodec(codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq2, &serviceRegResp2) + require.NoError(t, err) + require.Equal(t, uint64(20), serviceRegResp2.Services[0].CreateIndex) + require.Equal(t, uint64(20), serviceRegResp.Index) + require.Len(t, serviceRegResp2.Services, 1) + + // Perform a lookup with namespace and service name that shouldn't produce + // results. + serviceRegReq3 := &structs.ServiceRegistrationByNameRequest{ + ServiceName: services[0].ServiceName, + QueryOptions: structs.QueryOptions{ + Namespace: services[1].Namespace, + Region: s.Region(), + }, + } + var serviceRegResp3 structs.ServiceRegistrationByNameResponse + err = msgpackrpc.CallWithCodec(codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq3, &serviceRegResp3) + require.NoError(t, err) + require.Len(t, serviceRegResp3.Services, 0) + }, + name: "ACLs disabled", + }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + return TestACLServer(t, nil) + }, + testFn: func(t *testing.T, s *Server, token *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate mock services then upsert them individually using different indexes. + services := mock.ServiceRegistrations() + + require.NoError(t, s.fsm.State().UpsertServiceRegistrations( + structs.MsgTypeTestSetup, 10, []*structs.ServiceRegistration{services[0]})) + + require.NoError(t, s.fsm.State().UpsertServiceRegistrations( + structs.MsgTypeTestSetup, 20, []*structs.ServiceRegistration{services[1]})) + + // Lookup the first registration without using an ACL token + // which should fail. + serviceRegReq := &structs.ServiceRegistrationByNameRequest{ + ServiceName: services[0].ServiceName, + QueryOptions: structs.QueryOptions{ + Namespace: services[0].Namespace, + Region: s.Region(), + }, + } + var serviceRegResp structs.ServiceRegistrationByNameResponse + err := msgpackrpc.CallWithCodec(codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq, &serviceRegResp) + require.Error(t, err) + require.Contains(t, err.Error(), "Permission denied") + + // Lookup the first registration using the management token. + serviceRegReq2 := &structs.ServiceRegistrationByNameRequest{ + ServiceName: services[0].ServiceName, + QueryOptions: structs.QueryOptions{ + Namespace: services[0].Namespace, + Region: s.Region(), + AuthToken: token.SecretID, + }, + } + var serviceRegResp2 structs.ServiceRegistrationByNameResponse + err = msgpackrpc.CallWithCodec(codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq2, &serviceRegResp2) + require.Nil(t, err) + require.Len(t, serviceRegResp2.Services, 1) + require.EqualValues(t, 20, serviceRegResp2.Index) + + // Create a read policy for the default namespace and test this + // can correctly read the first service. + authToken1 := mock.CreatePolicyAndToken(t, s.State(), 30, "test-service-reg-get", + mock.NamespacePolicy(structs.DefaultNamespace, "read", nil)).SecretID + serviceRegReq3 := &structs.ServiceRegistrationByNameRequest{ + ServiceName: services[0].ServiceName, + QueryOptions: structs.QueryOptions{ + Namespace: services[0].Namespace, + Region: s.Region(), + AuthToken: authToken1, + }, + } + var serviceRegResp3 structs.ServiceRegistrationByNameResponse + err = msgpackrpc.CallWithCodec(codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq3, &serviceRegResp3) + require.Nil(t, err) + require.Len(t, serviceRegResp3.Services, 1) + require.EqualValues(t, 20, serviceRegResp2.Index) + + // Attempting to lookup services in a different namespace should fail. + serviceRegReq4 := &structs.ServiceRegistrationByNameRequest{ + ServiceName: services[1].ServiceName, + QueryOptions: structs.QueryOptions{ + Namespace: services[1].Namespace, + Region: s.Region(), + AuthToken: authToken1, + }, + } + var serviceRegResp4 structs.ServiceRegistrationByNameResponse + err = msgpackrpc.CallWithCodec(codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq4, &serviceRegResp4) + require.Error(t, err) + require.Contains(t, err.Error(), "Permission denied") + }, + name: "ACLs enabled", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + server, aclToken, cleanup := tc.serverFn(t) + defer cleanup() + tc.testFn(t, server, aclToken) + }) + } +} diff --git a/nomad/structs/alloc.go b/nomad/structs/alloc.go new file mode 100644 index 000000000000..9f1dee8d079c --- /dev/null +++ b/nomad/structs/alloc.go @@ -0,0 +1,24 @@ +package structs + +const ( + // AllocServiceRegistrationsRPCMethod is the RPC method for listing all + // service registrations assigned to a specific allocation. + // + // Args: AllocServiceRegistrationsRequest + // Reply: AllocServiceRegistrationsResponse + AllocServiceRegistrationsRPCMethod = "Alloc.GetServiceRegistrations" +) + +// AllocServiceRegistrationsRequest is the request object used to list all +// service registrations belonging to the specified Allocation.ID. +type AllocServiceRegistrationsRequest struct { + AllocID string + QueryOptions +} + +// AllocServiceRegistrationsResponse is the response object when performing a +// listing of services belonging to an allocation. +type AllocServiceRegistrationsResponse struct { + Services []*ServiceRegistration + QueryMeta +} diff --git a/nomad/structs/alloc_test.go b/nomad/structs/alloc_test.go new file mode 100644 index 000000000000..ce2ce52dab8f --- /dev/null +++ b/nomad/structs/alloc_test.go @@ -0,0 +1,12 @@ +package structs + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAllocServiceRegistrationsRequest_StaleReadSupport(t *testing.T) { + req := &AllocServiceRegistrationsRequest{} + require.True(t, req.IsRead()) +} diff --git a/nomad/structs/job.go b/nomad/structs/job.go new file mode 100644 index 000000000000..07c79023af79 --- /dev/null +++ b/nomad/structs/job.go @@ -0,0 +1,24 @@ +package structs + +const ( + // JobServiceRegistrationsRPCMethod is the RPC method for listing all + // service registrations assigned to a specific namespaced job. + // + // Args: JobServiceRegistrationsRequest + // Reply: JobServiceRegistrationsResponse + JobServiceRegistrationsRPCMethod = "Job.GetServiceRegistrations" +) + +// JobServiceRegistrationsRequest is the request object used to list all +// service registrations belonging to the specified Job.ID. +type JobServiceRegistrationsRequest struct { + JobID string + QueryOptions +} + +// JobServiceRegistrationsResponse is the response object when performing a +// listing of services belonging to a namespaced job. +type JobServiceRegistrationsResponse struct { + Services []*ServiceRegistration + QueryMeta +} diff --git a/nomad/structs/job_test.go b/nomad/structs/job_test.go new file mode 100644 index 000000000000..e64339510ce4 --- /dev/null +++ b/nomad/structs/job_test.go @@ -0,0 +1,12 @@ +package structs + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestServiceRegistrationsRequest_StaleReadSupport(t *testing.T) { + req := &AllocServiceRegistrationsRequest{} + require.True(t, req.IsRead()) +} diff --git a/nomad/structs/service_registration.go b/nomad/structs/service_registration.go index 9ebbe873b4d0..88c03c4d8838 100644 --- a/nomad/structs/service_registration.go +++ b/nomad/structs/service_registration.go @@ -1,6 +1,41 @@ package structs -import "github.com/hashicorp/nomad/helper" +import ( + "fmt" + + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/ipaddr" +) + +const ( + // ServiceRegistrationUpsertRPCMethod is the RPC method for upserting + // service registrations into Nomad state. + // + // Args: ServiceRegistrationUpsertRequest + // Reply: ServiceRegistrationUpsertResponse + ServiceRegistrationUpsertRPCMethod = "ServiceRegistration.Upsert" + + // ServiceRegistrationDeleteByIDRPCMethod is the RPC method for deleting + // a service registration by its ID. + // + // Args: ServiceRegistrationDeleteByIDRequest + // Reply: ServiceRegistrationDeleteByIDResponse + ServiceRegistrationDeleteByIDRPCMethod = "ServiceRegistration.DeleteByID" + + // ServiceRegistrationListRPCMethod is the RPC method for listing service + // registrations within Nomad. + // + // Args: ServiceRegistrationListRequest + // Reply: ServiceRegistrationListResponse + ServiceRegistrationListRPCMethod = "ServiceRegistration.List" + + // ServiceRegistrationGetServiceRPCMethod is the RPC method for detailing a + // service and its registrations according to its name. + // + // Args: ServiceRegistrationByNameRequest + // Reply: ServiceRegistrationByNameResponse + ServiceRegistrationGetServiceRPCMethod = "ServiceRegistration.GetService" +) // ServiceRegistration is the internal representation of a Nomad service // registration. @@ -24,7 +59,8 @@ type ServiceRegistration struct { NodeID string // Datacenter is the DC identifier of the node as identified by - // Node.Datacenter. It is denormalized here to allow filtering services by datacenter without looking up every node. + // Node.Datacenter. It is denormalized here to allow filtering services by + // datacenter without looking up every node. Datacenter string // JobID is Job.ID and represents the job which contained the service block @@ -105,3 +141,98 @@ func (s *ServiceRegistration) Equals(o *ServiceRegistration) bool { } return true } + +// Validate ensures the upserted service registration contains valid +// information and routing capabilities. Objects should never fail here as +// Nomad controls the entire registration process; but it's possible +// configuration problems could cause failures. +func (s *ServiceRegistration) Validate() error { + if ipaddr.IsAny(s.Address) { + return fmt.Errorf("invalid service registration address") + } + return nil +} + +// ServiceRegistrationUpsertRequest is the request object used to upsert one or +// more service registrations. +type ServiceRegistrationUpsertRequest struct { + Services []*ServiceRegistration + WriteRequest +} + +// ServiceRegistrationUpsertResponse is the response object when one or more +// service registrations have been successfully upserted into state. +type ServiceRegistrationUpsertResponse struct { + WriteMeta +} + +// ServiceRegistrationDeleteByIDRequest is the request object to delete a +// service registration as specified by the ID parameter. +type ServiceRegistrationDeleteByIDRequest struct { + ID string + WriteRequest +} + +// ServiceRegistrationDeleteByIDResponse is the response object when performing a +// deletion of an individual service registration. +type ServiceRegistrationDeleteByIDResponse struct { + WriteMeta +} + +// ServiceRegistrationDeleteByNodeIDRequest is the request object to delete all +// service registrations assigned to a particular node. +type ServiceRegistrationDeleteByNodeIDRequest struct { + NodeID string + WriteRequest +} + +// ServiceRegistrationDeleteByNodeIDResponse is the response object when +// performing a deletion of all service registrations assigned to a particular +// node. +type ServiceRegistrationDeleteByNodeIDResponse struct { + WriteMeta +} + +// ServiceRegistrationListRequest is the request object when performing service +// registration listings. +type ServiceRegistrationListRequest struct { + QueryOptions +} + +// ServiceRegistrationListResponse is the response object when performing a +// list of services. This is specifically concise to reduce the serialisation +// and network costs endpoint incur, particularly when performing blocking list +// queries. +type ServiceRegistrationListResponse struct { + Services []*ServiceRegistrationListStub + QueryMeta +} + +// ServiceRegistrationListStub is the object which contains a list of namespace +// service registrations and their tags. +type ServiceRegistrationListStub struct { + Namespace string + Services []*ServiceRegistrationStub +} + +// ServiceRegistrationStub is the stub object describing an individual +// namespaced service. The object is built in a manner which would allow us to +// add additional fields in the future, if we wanted. +type ServiceRegistrationStub struct { + ServiceName string + Tags []string +} + +// ServiceRegistrationByNameRequest is the request object to perform a lookup +// of services matching a specific name. +type ServiceRegistrationByNameRequest struct { + ServiceName string + QueryOptions +} + +// ServiceRegistrationByNameResponse is the response object when performing a +// lookup of services matching a specific name. +type ServiceRegistrationByNameResponse struct { + Services []*ServiceRegistration + QueryMeta +} diff --git a/nomad/structs/service_registration_test.go b/nomad/structs/service_registration_test.go index 2d362b821aa5..d8fe06b27537 100644 --- a/nomad/structs/service_registration_test.go +++ b/nomad/structs/service_registration_test.go @@ -381,3 +381,13 @@ func TestServiceRegistration_Equal(t *testing.T) { }) } } + +func TestServiceRegistrationListRequest_StaleReadSupport(t *testing.T) { + req := &ServiceRegistrationListRequest{} + require.True(t, req.IsRead()) +} + +func TestServiceRegistrationByNameRequest_StaleReadSupport(t *testing.T) { + req := &ServiceRegistrationByNameRequest{} + require.True(t, req.IsRead()) +}