Skip to content

Commit

Permalink
api: enable selecting subset of services using rendezvous hashing
Browse files Browse the repository at this point in the history
This PR adds the 'choose' query parameter to the '/v1/service/<service>' endpoint.

The value of 'choose' is in the form '<number>|<key>', number is the number
of desired services and key is a value unique but consistent to the requester
(e.g. allocID).

Folks aren't really expected to use this API directly, but rather through consul-template
which will soon be getting a new helper function making use of this query parameter.

Example,

curl 'localhost:4646/v1/service/redis?choose=2|abc123'
  • Loading branch information
shoenig committed May 12, 2022
1 parent f0031cf commit 56b93a7
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 2 deletions.
8 changes: 8 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ type QueryOptions struct {
// Currently only supported by specific endpoints.
Reverse bool

// Choose is used when selecting a subset of services from the normal
// complete list of services. Format is <count>|<hash>, where the hash is
// typically the allocation ID of the requesting task.
Choose string

// ctx is an optional context pass through to the underlying HTTP
// request layer. Use Context() and WithContext() to manage this.
ctx context.Context
Expand Down Expand Up @@ -611,6 +616,9 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.Filter != "" {
r.params.Set("filter", q.Filter)
}
if q.Choose != "" {
r.params.Set("choose", q.Choose)
}
if q.PerPage != 0 {
r.params.Set("per_page", fmt.Sprint(q.PerPage))
}
Expand Down
5 changes: 4 additions & 1 deletion command/agent/service_registration_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func (s *HTTPServer) ServiceRegistrationRequest(resp http.ResponseWriter, req *h
func (s *HTTPServer) serviceGetRequest(
resp http.ResponseWriter, req *http.Request, serviceName string) (interface{}, error) {

args := structs.ServiceRegistrationByNameRequest{ServiceName: serviceName}
args := structs.ServiceRegistrationByNameRequest{
ServiceName: serviceName,
Choose: req.URL.Query().Get("choose"),
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ module github.com/hashicorp/nomad

go 1.17

// remove
replace github.com/hashicorp/consul-template => ../consul-template

// Pinned dependencies are noted in github.com/hashicorp/nomad/issues/11826
replace (
github.com/Microsoft/go-winio => github.com/endocrimes/go-winio v0.4.13-0.20190628114223-fb47a8b41948
Expand Down
79 changes: 78 additions & 1 deletion nomad/service_registration_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package nomad

import (
"net/http"
"sort"
"strconv"
"strings"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -423,6 +426,16 @@ func (s *ServiceRegistration) GetService(
http.StatusBadRequest, "failed to read result page: %v", err)
}

// Select which subset and the order of services to return if using ?choose
if args.Choose != "" {
chosen, chooseErr := s.choose(services, args.Choose)
if chooseErr != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to choose services: %v", chooseErr)
}
services = chosen
}

// Populate the reply.
reply.Services = services
reply.NextToken = nextToken
Expand All @@ -434,6 +447,70 @@ func (s *ServiceRegistration) GetService(
})
}

// choose uses rendezvous hashing to make a stable selection of a subset of services
// to return.
//
// parameter must in the form "<number>|<key>", where number is the number of services
// to select, and key is incorporated in the hashing function with each service -
// creating a unique yet consistent priority distribution pertaining to the requester.
// In practice (i.e. via consul-template), the key is the AllocID generating a request
// for upstream services.
//
// https://en.wikipedia.org/wiki/Rendezvous_hashing
// w := priority (i.e. hash value)
// h := hash function
// O := object - (i.e. requesting service - using key (allocID) as a proxy)
// S := site (i.e. destination service)
func (*ServiceRegistration) choose(services []*structs.ServiceRegistration, parameter string) ([]*structs.ServiceRegistration, error) {
// extract the number of services
tokens := strings.SplitN(parameter, "|", 2)
if len(tokens) != 2 {
return nil, structs.ErrMalformedChooseParameter
}
n, err := strconv.Atoi(tokens[0])
if err != nil {
return nil, structs.ErrMalformedChooseParameter
}

// extract the hash key
key := tokens[1]
if key == "" {
return nil, structs.ErrMalformedChooseParameter
}

// if there are fewer services than requested, go with the number of services
if l := len(services); l < n {
n = l
}

type pair struct {
hash string
service *structs.ServiceRegistration
}

// associate hash for each service
priorities := make([]*pair, len(services))
for i, service := range services {
priorities[i] = &pair{
hash: service.HashWith(key),
service: service,
}
}

// sort by the hash; creating random distribution of priority
sort.SliceStable(priorities, func(i, j int) bool {
return priorities[i].hash < priorities[j].hash
})

// choose top n services
chosen := make([]*structs.ServiceRegistration, n)
for i := 0; i < n; i++ {
chosen[i] = priorities[i].service
}

return chosen, nil
}

// handleMixedAuthEndpoint is a helper to handle auth on RPC endpoints that can
// either be called by Nomad nodes, or by external clients.
func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions, cap string) error {
Expand All @@ -451,7 +528,7 @@ func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions,
}
}
default:
// In the event we got any error other than notfound, consider this
// In the event we got any error other than ErrTokenNotFound, consider this
// terminal.
if err != structs.ErrTokenNotFound {
return err
Expand Down
154 changes: 154 additions & 0 deletions nomad/service_registration_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,84 @@ func TestServiceRegistration_GetService(t *testing.T) {
},
name: "filtering and pagination",
},
{
name: "choose 2",
serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) {
server, cleanup := TestServer(t, nil)
return server, nil, cleanup
},
testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)

// insert 3 instances of service s1
nodeID, jobID, allocID := "node_id", "job_id", "alloc_id"
services := []*structs.ServiceRegistration{
{
ID: "id_1",
Namespace: "default",
ServiceName: "s1",
NodeID: nodeID,
Datacenter: "dc1",
JobID: jobID,
AllocID: allocID,
Tags: []string{"tag1"},
Address: "10.0.0.1",
Port: 9001,
CreateIndex: 101,
ModifyIndex: 201,
},
{
ID: "id_2",
Namespace: "default",
ServiceName: "s1",
NodeID: nodeID,
Datacenter: "dc1",
JobID: jobID,
AllocID: allocID,
Tags: []string{"tag2"},
Address: "10.0.0.2",
Port: 9002,
CreateIndex: 102,
ModifyIndex: 202,
},
{
ID: "id_3",
Namespace: "default",
ServiceName: "s1",
NodeID: nodeID,
Datacenter: "dc1",
JobID: jobID,
AllocID: allocID,
Tags: []string{"tag3"},
Address: "10.0.0.3",
Port: 9003,
CreateIndex: 103,
ModifyIndex: 103,
},
}
require.NoError(t, s.fsm.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services))

serviceRegReq := &structs.ServiceRegistrationByNameRequest{
ServiceName: "s1",
Choose: "2|abc123", // select 2 in consistent order
QueryOptions: structs.QueryOptions{
Namespace: structs.DefaultNamespace,
Region: DefaultRegion,
},
}
var serviceRegResp structs.ServiceRegistrationByNameResponse
err := msgpackrpc.CallWithCodec(
codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq, &serviceRegResp)
require.NoError(t, err)

result := serviceRegResp.Services

require.Len(t, result, 2)
require.Equal(t, "10.0.0.3", result[0].Address)
require.Equal(t, "10.0.0.2", result[1].Address)
},
},
}

for _, tc := range testCases {
Expand All @@ -1184,3 +1262,79 @@ func TestServiceRegistration_GetService(t *testing.T) {
})
}
}

func TestServiceRegistration_chooseErr(t *testing.T) {
ci.Parallel(t)

sr := (*ServiceRegistration)(nil)
try := func(input []*structs.ServiceRegistration, parameter string) {
result, err := sr.choose(input, parameter)
require.Empty(t, result)
require.ErrorIs(t, err, structs.ErrMalformedChooseParameter)
}

regs := []*structs.ServiceRegistration{
{ID: "abc001", ServiceName: "s1"},
{ID: "abc002", ServiceName: "s2"},
{ID: "abc003", ServiceName: "s3"},
}

try(regs, "")
try(regs, "1|")
try(regs, "|abc")
try(regs, "a|abc")
}

func TestServiceRegistration_choose(t *testing.T) {
ci.Parallel(t)

sr := (*ServiceRegistration)(nil)
try := func(input, exp []*structs.ServiceRegistration, parameter string) {
result, err := sr.choose(input, parameter)
require.NoError(t, err)
require.Equal(t, exp, result)
}

// zero services
try(nil, []*structs.ServiceRegistration{}, "1|aaa")
try(nil, []*structs.ServiceRegistration{}, "2|aaa")

// some unique services
regs := []*structs.ServiceRegistration{
{ID: "abc001", ServiceName: "s1"},
{ID: "abc002", ServiceName: "s1"},
{ID: "abc003", ServiceName: "s1"},
}

// same key, increasing n -> maintains order (n=1)
try(regs, []*structs.ServiceRegistration{
{ID: "abc002", ServiceName: "s1"},
}, "1|aaa")

// same key, increasing n -> maintains order (n=2)
try(regs, []*structs.ServiceRegistration{
{ID: "abc002", ServiceName: "s1"},
{ID: "abc003", ServiceName: "s1"},
}, "2|aaa")

// same key, increasing n -> maintains order (n=3)
try(regs, []*structs.ServiceRegistration{
{ID: "abc002", ServiceName: "s1"},
{ID: "abc003", ServiceName: "s1"},
{ID: "abc001", ServiceName: "s1"},
}, "3|aaa")

// unique key -> different orders
try(regs, []*structs.ServiceRegistration{
{ID: "abc001", ServiceName: "s1"},
{ID: "abc002", ServiceName: "s1"},
{ID: "abc003", ServiceName: "s1"},
}, "3|bbb")

// another key -> another order
try(regs, []*structs.ServiceRegistration{
{ID: "abc002", ServiceName: "s1"},
{ID: "abc003", ServiceName: "s1"},
{ID: "abc001", ServiceName: "s1"},
}, "3|ccc")
}
2 changes: 2 additions & 0 deletions nomad/structs/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later"
errMissingAllocID = "Missing allocation ID"
errIncompatibleFiltering = "Filter expression cannot be used with other filter parameters"
errMalformedChooseParameter = "Parameter for choose must be in form '<number>|<key>'"

// Prefix based errors that are used to check if the error is of a given
// type. These errors should be created with the associated constructor.
Expand Down Expand Up @@ -55,6 +56,7 @@ var (
ErrNodeLacksRpc = errors.New(errNodeLacksRpc)
ErrMissingAllocID = errors.New(errMissingAllocID)
ErrIncompatibleFiltering = errors.New(errIncompatibleFiltering)
ErrMalformedChooseParameter = errors.New(errMalformedChooseParameter)

ErrUnknownNode = errors.New(ErrUnknownNodePrefix)

Expand Down
22 changes: 22 additions & 0 deletions nomad/structs/service_registration.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package structs

import (
"crypto/md5"
"encoding/binary"
"fmt"

"github.com/hashicorp/nomad/helper"
Expand Down Expand Up @@ -171,6 +173,25 @@ func (s *ServiceRegistration) GetNamespace() string {
return s.Namespace
}

// HashWith generates a unique value representative of s based on the contents of s.
func (s *ServiceRegistration) HashWith(key string) string {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(s.Port))

sum := md5.New()
sum.Write(buf)
sum.Write([]byte(s.AllocID))
sum.Write([]byte(s.ID))
sum.Write([]byte(s.Namespace))
sum.Write([]byte(s.Address))
sum.Write([]byte(s.ServiceName))
for _, tag := range s.Tags {
sum.Write([]byte(tag))
}
sum.Write([]byte(key))
return fmt.Sprintf("%x", sum.Sum(nil))
}

// ServiceRegistrationUpsertRequest is the request object used to upsert one or
// more service registrations.
type ServiceRegistrationUpsertRequest struct {
Expand Down Expand Up @@ -245,6 +266,7 @@ type ServiceRegistrationStub struct {
// of services matching a specific name.
type ServiceRegistrationByNameRequest struct {
ServiceName string
Choose string // stable selection of n services
QueryOptions
}

Expand Down
Loading

0 comments on commit 56b93a7

Please sign in to comment.