diff --git a/.changelog/12862.txt b/.changelog/12862.txt new file mode 100644 index 000000000000..182a8b42dfd9 --- /dev/null +++ b/.changelog/12862.txt @@ -0,0 +1,3 @@ +```release-note:improvement +api: enable setting ?choose parameter when querying services +``` diff --git a/command/agent/service_registration_endpoint.go b/command/agent/service_registration_endpoint.go index 9107f8006970..3d6151bea83f 100644 --- a/command/agent/service_registration_endpoint.go +++ b/command/agent/service_registration_endpoint.go @@ -90,7 +90,10 @@ func (s *HTTPServer) ServiceRegistrationRequest(resp http.ResponseWriter, req *h func (s *HTTPServer) serviceGetRequest( resp http.ResponseWriter, req *http.Request, serviceName string) (interface{}, error) { - args := structs.ServiceRegistrationByNameRequest{ServiceName: serviceName} + args := structs.ServiceRegistrationByNameRequest{ + ServiceName: serviceName, + Choose: req.URL.Query().Get("choose"), + } if s.parse(resp, req, &args.Region, &args.QueryOptions) { return nil, nil } diff --git a/command/agent/service_registration_endpoint_test.go b/command/agent/service_registration_endpoint_test.go index d6e6954377fd..489c6fa983bf 100644 --- a/command/agent/service_registration_endpoint_test.go +++ b/command/agent/service_registration_endpoint_test.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -157,6 +158,7 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { name string }{ { + name: "delete by ID", testFn: func(s *TestAgent) { // Grab the state, so we can manipulate it and test against it. @@ -186,9 +188,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.Nil(t, out) require.NoError(t, err) }, - name: "delete by ID", }, { + name: "get service by name", testFn: func(s *TestAgent) { // Grab the state, so we can manipulate it and test against it. @@ -214,9 +216,99 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.NotZero(t, respW.Header().Get("X-Nomad-Index")) require.Equal(t, serviceReg, obj.([]*structs.ServiceRegistration)[0]) }, - name: "get service by name", }, { + name: "get service using choose", + testFn: func(s *TestAgent) { + // Grab the state so we can manipulate and test against it. + testState := s.Agent.server.State() + + err := testState.UpsertServiceRegistrations( + structs.MsgTypeTestSetup, 10, + []*structs.ServiceRegistration{{ + ID: "978d519a-46ad-fb04-966b-000000000001", + ServiceName: "redis", + Namespace: "default", + NodeID: "node1", + Datacenter: "dc1", + JobID: "job1", + AllocID: "8b83191f-cb29-e23a-d955-220b65ef676d", + Tags: nil, + Address: "10.0.0.1", + Port: 8080, + CreateIndex: 10, + ModifyIndex: 10, + }, { + ID: "978d519a-46ad-fb04-966b-000000000002", + ServiceName: "redis", + Namespace: "default", + NodeID: "node2", + Datacenter: "dc1", + JobID: "job1", + AllocID: "df6de93c-9376-a774-bcdf-3bd817e18078", + Tags: nil, + Address: "10.0.0.2", + Port: 8080, + CreateIndex: 10, + ModifyIndex: 10, + }, { + ID: "978d519a-46ad-fb04-966b-000000000003", + ServiceName: "redis", + Namespace: "default", + NodeID: "node3", + Datacenter: "dc1", + JobID: "job1", + AllocID: "df6de93c-9376-a774-bcdf-3bd817e18078", + Tags: nil, + Address: "10.0.0.3", + Port: 8080, + CreateIndex: 10, + ModifyIndex: 10, + }}, + ) + must.NoError(t, err) + + // Build the HTTP request for 1 instance of the service, using key=abc123 + req, err := http.NewRequest(http.MethodGet, "/v1/service/redis?choose=1|abc123", nil) + must.NoError(t, err) + respW := httptest.NewRecorder() + + // Send the HTTP request. + obj, err := s.Server.ServiceRegistrationRequest(respW, req) + must.NoError(t, err) + + // Check we got the correct type back. + services, ok := (obj).([]*structs.ServiceRegistration) + must.True(t, ok) + + // Check we got the expected number of services back. + must.Len(t, 1, services) + + // Build the HTTP request for 2 instances of the service, still using key=abc123 + req2, err := http.NewRequest(http.MethodGet, "/v1/service/redis?choose=2|abc123", nil) + must.NoError(t, err) + respW2 := httptest.NewRecorder() + + // Send the 2nd HTTP request. + obj2, err := s.Server.ServiceRegistrationRequest(respW2, req2) + must.NoError(t, err) + + // Check we got the correct type back. + services2, ok := (obj2).([]*structs.ServiceRegistration) + must.True(t, ok) + + // Check we got the expected number of services back. + must.Len(t, 2, services2) + + // Check the first service is the same as the previous service. + must.Eq(t, services[0], services2[0]) + + // Check the second service is not the same as the first service. + must.NotEq(t, services2[0], services2[1]) + }, + }, + { + name: "incorrect URI format", testFn: func(s *TestAgent) { // Build the HTTP request. @@ -230,9 +322,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.Contains(t, err.Error(), "invalid URI") require.Nil(t, obj) }, - name: "incorrect URI format", }, { + name: "get service empty name", testFn: func(s *TestAgent) { // Build the HTTP request. @@ -246,9 +338,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.Contains(t, err.Error(), "missing service name") require.Nil(t, obj) }, - name: "get service empty name", }, { + name: "get service incorrect method", testFn: func(s *TestAgent) { // Build the HTTP request. @@ -262,9 +354,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.Contains(t, err.Error(), "Invalid method") require.Nil(t, obj) }, - name: "get service incorrect method", }, { + name: "delete service empty id", testFn: func(s *TestAgent) { // Build the HTTP request. @@ -278,9 +370,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.Contains(t, err.Error(), "missing service id") require.Nil(t, obj) }, - name: "delete service empty id", }, { + name: "delete service incorrect method", testFn: func(s *TestAgent) { // Build the HTTP request. @@ -294,7 +386,6 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.Contains(t, err.Error(), "Invalid method") require.Nil(t, obj) }, - name: "delete service incorrect method", }, } diff --git a/go.mod b/go.mod index 571c0fa69d02..8e3300bb81cc 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/gosuri/uilive v0.0.4 github.com/grpc-ecosystem/go-grpc-middleware v1.2.1-0.20200228141219-3ce3d519df39 github.com/hashicorp/consul v1.7.8 - github.com/hashicorp/consul-template v0.29.0 + github.com/hashicorp/consul-template v0.29.1 github.com/hashicorp/consul/api v1.13.0 github.com/hashicorp/consul/sdk v0.8.0 github.com/hashicorp/cronexpr v1.1.1 diff --git a/nomad/service_registration_endpoint.go b/nomad/service_registration_endpoint.go index 3684fc460746..f417a8c85188 100644 --- a/nomad/service_registration_endpoint.go +++ b/nomad/service_registration_endpoint.go @@ -2,6 +2,9 @@ package nomad import ( "net/http" + "sort" + "strconv" + "strings" "time" "github.com/armon/go-metrics" @@ -423,6 +426,16 @@ func (s *ServiceRegistration) GetService( http.StatusBadRequest, "failed to read result page: %v", err) } + // Select which subset and the order of services to return if using ?choose + if args.Choose != "" { + chosen, chooseErr := s.choose(services, args.Choose) + if chooseErr != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to choose services: %v", chooseErr) + } + services = chosen + } + // Populate the reply. reply.Services = services reply.NextToken = nextToken @@ -434,6 +447,70 @@ func (s *ServiceRegistration) GetService( }) } +// choose uses rendezvous hashing to make a stable selection of a subset of services +// to return. +// +// parameter must in the form "|", where number is the number of services +// to select, and key is incorporated in the hashing function with each service - +// creating a unique yet consistent priority distribution pertaining to the requester. +// In practice (i.e. via consul-template), the key is the AllocID generating a request +// for upstream services. +// +// https://en.wikipedia.org/wiki/Rendezvous_hashing +// w := priority (i.e. hash value) +// h := hash function +// O := object - (i.e. requesting service - using key (allocID) as a proxy) +// S := site (i.e. destination service) +func (*ServiceRegistration) choose(services []*structs.ServiceRegistration, parameter string) ([]*structs.ServiceRegistration, error) { + // extract the number of services + tokens := strings.SplitN(parameter, "|", 2) + if len(tokens) != 2 { + return nil, structs.ErrMalformedChooseParameter + } + n, err := strconv.Atoi(tokens[0]) + if err != nil { + return nil, structs.ErrMalformedChooseParameter + } + + // extract the hash key + key := tokens[1] + if key == "" { + return nil, structs.ErrMalformedChooseParameter + } + + // if there are fewer services than requested, go with the number of services + if l := len(services); l < n { + n = l + } + + type pair struct { + hash string + service *structs.ServiceRegistration + } + + // associate hash for each service + priorities := make([]*pair, len(services)) + for i, service := range services { + priorities[i] = &pair{ + hash: service.HashWith(key), + service: service, + } + } + + // sort by the hash; creating random distribution of priority + sort.SliceStable(priorities, func(i, j int) bool { + return priorities[i].hash < priorities[j].hash + }) + + // choose top n services + chosen := make([]*structs.ServiceRegistration, n) + for i := 0; i < n; i++ { + chosen[i] = priorities[i].service + } + + return chosen, nil +} + // handleMixedAuthEndpoint is a helper to handle auth on RPC endpoints that can // either be called by Nomad nodes, or by external clients. func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions, cap string) error { @@ -451,7 +528,7 @@ func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions, } } default: - // In the event we got any error other than notfound, consider this + // In the event we got any error other than ErrTokenNotFound, consider this // terminal. if err != structs.ErrTokenNotFound { return err diff --git a/nomad/service_registration_endpoint_test.go b/nomad/service_registration_endpoint_test.go index 164f4d8a16fb..34c76eed6c75 100644 --- a/nomad/service_registration_endpoint_test.go +++ b/nomad/service_registration_endpoint_test.go @@ -5,12 +5,13 @@ import ( "testing" "github.com/hashicorp/go-memdb" - msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -1174,6 +1175,148 @@ func TestServiceRegistration_GetService(t *testing.T) { }, name: "filtering and pagination", }, + { + name: "choose 2 of 3", + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // insert 3 instances of service s1 + nodeID, jobID, allocID := "node_id", "job_id", "alloc_id" + services := []*structs.ServiceRegistration{ + { + ID: "id_1", + Namespace: "default", + ServiceName: "s1", + NodeID: nodeID, + Datacenter: "dc1", + JobID: jobID, + AllocID: allocID, + Tags: []string{"tag1"}, + Address: "10.0.0.1", + Port: 9001, + CreateIndex: 101, + ModifyIndex: 201, + }, + { + ID: "id_2", + Namespace: "default", + ServiceName: "s1", + NodeID: nodeID, + Datacenter: "dc1", + JobID: jobID, + AllocID: allocID, + Tags: []string{"tag2"}, + Address: "10.0.0.2", + Port: 9002, + CreateIndex: 102, + ModifyIndex: 202, + }, + { + ID: "id_3", + Namespace: "default", + ServiceName: "s1", + NodeID: nodeID, + Datacenter: "dc1", + JobID: jobID, + AllocID: allocID, + Tags: []string{"tag3"}, + Address: "10.0.0.3", + Port: 9003, + CreateIndex: 103, + ModifyIndex: 103, + }, + } + must.NoError(t, s.fsm.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + serviceRegReq := &structs.ServiceRegistrationByNameRequest{ + ServiceName: "s1", + Choose: "2|abc123", // select 2 in consistent order + QueryOptions: structs.QueryOptions{ + Namespace: structs.DefaultNamespace, + Region: DefaultRegion, + }, + } + var serviceRegResp structs.ServiceRegistrationByNameResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq, &serviceRegResp) + must.NoError(t, err) + + result := serviceRegResp.Services + + must.Len(t, 2, result) + must.Eq(t, "10.0.0.3", result[0].Address) + must.Eq(t, "10.0.0.2", result[1].Address) + }, + }, + { + name: "choose 3 of 2", // gracefully handle requesting too many + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // insert 2 instances of service s1 + nodeID, jobID, allocID := "node_id", "job_id", "alloc_id" + services := []*structs.ServiceRegistration{ + { + ID: "id_1", + Namespace: "default", + ServiceName: "s1", + NodeID: nodeID, + Datacenter: "dc1", + JobID: jobID, + AllocID: allocID, + Tags: []string{"tag1"}, + Address: "10.0.0.1", + Port: 9001, + CreateIndex: 101, + ModifyIndex: 201, + }, + { + ID: "id_2", + Namespace: "default", + ServiceName: "s1", + NodeID: nodeID, + Datacenter: "dc1", + JobID: jobID, + AllocID: allocID, + Tags: []string{"tag2"}, + Address: "10.0.0.2", + Port: 9002, + CreateIndex: 102, + ModifyIndex: 202, + }, + } + must.NoError(t, s.fsm.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + serviceRegReq := &structs.ServiceRegistrationByNameRequest{ + ServiceName: "s1", + Choose: "3|abc123", // select 3 in consistent order (though there are only 2 total) + QueryOptions: structs.QueryOptions{ + Namespace: structs.DefaultNamespace, + Region: DefaultRegion, + }, + } + var serviceRegResp structs.ServiceRegistrationByNameResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq, &serviceRegResp) + must.NoError(t, err) + + result := serviceRegResp.Services + + must.Len(t, 2, result) + must.Eq(t, "10.0.0.2", result[0].Address) + must.Eq(t, "10.0.0.1", result[1].Address) + }, + }, } for _, tc := range testCases { @@ -1184,3 +1327,79 @@ func TestServiceRegistration_GetService(t *testing.T) { }) } } + +func TestServiceRegistration_chooseErr(t *testing.T) { + ci.Parallel(t) + + sr := (*ServiceRegistration)(nil) + try := func(input []*structs.ServiceRegistration, parameter string) { + result, err := sr.choose(input, parameter) + must.Empty(t, result) + must.ErrorIs(t, err, structs.ErrMalformedChooseParameter) + } + + regs := []*structs.ServiceRegistration{ + {ID: "abc001", ServiceName: "s1"}, + {ID: "abc002", ServiceName: "s2"}, + {ID: "abc003", ServiceName: "s3"}, + } + + try(regs, "") + try(regs, "1|") + try(regs, "|abc") + try(regs, "a|abc") +} + +func TestServiceRegistration_choose(t *testing.T) { + ci.Parallel(t) + + sr := (*ServiceRegistration)(nil) + try := func(input, exp []*structs.ServiceRegistration, parameter string) { + result, err := sr.choose(input, parameter) + must.NoError(t, err) + must.Eq(t, exp, result) + } + + // zero services + try(nil, []*structs.ServiceRegistration{}, "1|aaa") + try(nil, []*structs.ServiceRegistration{}, "2|aaa") + + // some unique services + regs := []*structs.ServiceRegistration{ + {ID: "abc001", ServiceName: "s1"}, + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + } + + // same key, increasing n -> maintains order (n=1) + try(regs, []*structs.ServiceRegistration{ + {ID: "abc002", ServiceName: "s1"}, + }, "1|aaa") + + // same key, increasing n -> maintains order (n=2) + try(regs, []*structs.ServiceRegistration{ + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + }, "2|aaa") + + // same key, increasing n -> maintains order (n=3) + try(regs, []*structs.ServiceRegistration{ + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + {ID: "abc001", ServiceName: "s1"}, + }, "3|aaa") + + // unique key -> different orders + try(regs, []*structs.ServiceRegistration{ + {ID: "abc001", ServiceName: "s1"}, + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + }, "3|bbb") + + // another key -> another order + try(regs, []*structs.ServiceRegistration{ + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + {ID: "abc001", ServiceName: "s1"}, + }, "3|ccc") +} diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index 69747a40e2ba..b98d814761f6 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -20,6 +20,7 @@ const ( errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later" errMissingAllocID = "Missing allocation ID" errIncompatibleFiltering = "Filter expression cannot be used with other filter parameters" + errMalformedChooseParameter = "Parameter for choose must be in form '|'" // Prefix based errors that are used to check if the error is of a given // type. These errors should be created with the associated constructor. @@ -55,6 +56,7 @@ var ( ErrNodeLacksRpc = errors.New(errNodeLacksRpc) ErrMissingAllocID = errors.New(errMissingAllocID) ErrIncompatibleFiltering = errors.New(errIncompatibleFiltering) + ErrMalformedChooseParameter = errors.New(errMalformedChooseParameter) ErrUnknownNode = errors.New(ErrUnknownNodePrefix) diff --git a/nomad/structs/service_registration.go b/nomad/structs/service_registration.go index 31554fe5d8f7..9929e83fc7c5 100644 --- a/nomad/structs/service_registration.go +++ b/nomad/structs/service_registration.go @@ -1,6 +1,8 @@ package structs import ( + "crypto/md5" + "encoding/binary" "fmt" "github.com/hashicorp/nomad/helper" @@ -171,6 +173,25 @@ func (s *ServiceRegistration) GetNamespace() string { return s.Namespace } +// HashWith generates a unique value representative of s based on the contents of s. +func (s *ServiceRegistration) HashWith(key string) string { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(s.Port)) + + sum := md5.New() + sum.Write(buf) + sum.Write([]byte(s.AllocID)) + sum.Write([]byte(s.ID)) + sum.Write([]byte(s.Namespace)) + sum.Write([]byte(s.Address)) + sum.Write([]byte(s.ServiceName)) + for _, tag := range s.Tags { + sum.Write([]byte(tag)) + } + sum.Write([]byte(key)) + return fmt.Sprintf("%x", sum.Sum(nil)) +} + // ServiceRegistrationUpsertRequest is the request object used to upsert one or // more service registrations. type ServiceRegistrationUpsertRequest struct { @@ -245,6 +266,7 @@ type ServiceRegistrationStub struct { // of services matching a specific name. type ServiceRegistrationByNameRequest struct { ServiceName string + Choose string // stable selection of n services QueryOptions } diff --git a/nomad/structs/service_registration_test.go b/nomad/structs/service_registration_test.go index da9c0773bc96..cfa8e3bb23d1 100644 --- a/nomad/structs/service_registration_test.go +++ b/nomad/structs/service_registration_test.go @@ -3,6 +3,7 @@ package structs import ( "testing" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -447,3 +448,27 @@ func TestServiceRegistrationByNameRequest_StaleReadSupport(t *testing.T) { req := &ServiceRegistrationByNameRequest{} require.True(t, req.IsRead()) } + +func TestServiceRegistration_HashWith(t *testing.T) { + a := ServiceRegistration{ + Address: "10.0.0.1", + Port: 9999, + } + + // same service, same key -> same hash + must.Eq(t, a.HashWith("aaa"), a.HashWith("aaa")) + + // same service, different key -> different hash + must.NotEq(t, a.HashWith("aaa"), a.HashWith("bbb")) + + b := ServiceRegistration{ + Address: "10.0.0.2", + Port: 9998, + } + + // different service, same key -> different hash + must.NotEq(t, a.HashWith("aaa"), b.HashWith("aaa")) + + // different service, different key -> different hash + must.NotEq(t, a.HashWith("aaa"), b.HashWith("bbb")) +} diff --git a/website/content/api-docs/services.mdx b/website/content/api-docs/services.mdx index 465a2ce9aee5..7905d3cb99d3 100644 --- a/website/content/api-docs/services.mdx +++ b/website/content/api-docs/services.mdx @@ -94,6 +94,11 @@ The table below shows this endpoint's support for used to filter the results. Consider using pagination or a query parameter to reduce resource used to serve the request. +- `choose` `(string: "")` - Specifies the number of services to return and a hash + key. Must be in the form `|`. Nomad uses [rendezvous hashing][hash] to deliver + consistent results for a given key, and stable results when the number of services + changes. + ### Sample Request ```shell-session @@ -173,3 +178,5 @@ $ curl \ --request DELETE \ https://localhost:4646/v1/service/example-cache-redis/_nomad-task-ba731da0-6df9-9858-ef23-806e9758a899-redis-example-cache-redis-db ``` + +[hash]: https://en.wikipedia.org/wiki/Rendezvous_hashing \ No newline at end of file