Skip to content

Commit

Permalink
api: enable selecting subset of services using rendezvous hashing
Browse files Browse the repository at this point in the history
This PR adds the 'choose' query parameter to the '/v1/service/<service>' endpoint.

The value of 'choose' is in the form '<number>|<key>', number is the number
of desired services and key is a value unique but consistent to the requester
(e.g. allocID).

Folks aren't really expected to use this API directly, but rather through consul-template
which will soon be getting a new helper function making use of this query parameter.

Example,

curl 'localhost:4646/v1/service/redis?choose=2|abc123'

Note: consul-templte v0.29.1 includes the necessary nomadServices functionality.
  • Loading branch information
shoenig committed Jun 25, 2022
1 parent f5b8506 commit 0fe0b54
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 23 deletions.
4 changes: 2 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"time"

"github.com/gorilla/websocket"
cleanhttp "github.com/hashicorp/go-cleanhttp"
rootcerts "github.com/hashicorp/go-rootcerts"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-rootcerts"
)

var (
Expand Down
26 changes: 8 additions & 18 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -119,31 +118,22 @@ func TestRequestTime(t *testing.T) {

func TestDefaultConfig_env(t *testing.T) {
testutil.Parallel(t)
url := "http://1.2.3.4:5678"
testURL := "http://1.2.3.4:5678"
auth := []string{"nomaduser", "12345"}
region := "test"
namespace := "dev"
token := "foobar"

os.Setenv("NOMAD_ADDR", url)
defer os.Setenv("NOMAD_ADDR", "")

os.Setenv("NOMAD_REGION", region)
defer os.Setenv("NOMAD_REGION", "")

os.Setenv("NOMAD_NAMESPACE", namespace)
defer os.Setenv("NOMAD_NAMESPACE", "")

os.Setenv("NOMAD_HTTP_AUTH", strings.Join(auth, ":"))
defer os.Setenv("NOMAD_HTTP_AUTH", "")

os.Setenv("NOMAD_TOKEN", token)
defer os.Setenv("NOMAD_TOKEN", "")
t.Setenv("NOMAD_ADDR", testURL)
t.Setenv("NOMAD_REGION", region)
t.Setenv("NOMAD_NAMESPACE", namespace)
t.Setenv("NOMAD_HTTP_AUTH", strings.Join(auth, ":"))
t.Setenv("NOMAD_TOKEN", token)

config := DefaultConfig()

if config.Address != url {
t.Errorf("expected %q to be %q", config.Address, url)
if config.Address != testURL {
t.Errorf("expected %q to be %q", config.Address, testURL)
}

if config.Region != region {
Expand Down
5 changes: 4 additions & 1 deletion command/agent/service_registration_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func (s *HTTPServer) ServiceRegistrationRequest(resp http.ResponseWriter, req *h
func (s *HTTPServer) serviceGetRequest(
resp http.ResponseWriter, req *http.Request, serviceName string) (interface{}, error) {

args := structs.ServiceRegistrationByNameRequest{ServiceName: serviceName}
args := structs.ServiceRegistrationByNameRequest{
ServiceName: serviceName,
Choose: req.URL.Query().Get("choose"),
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/gosuri/uilive v0.0.4
github.com/grpc-ecosystem/go-grpc-middleware v1.2.1-0.20200228141219-3ce3d519df39
github.com/hashicorp/consul v1.7.8
github.com/hashicorp/consul-template v0.29.0
github.com/hashicorp/consul-template v0.29.1
github.com/hashicorp/consul/api v1.13.0
github.com/hashicorp/consul/sdk v0.8.0
github.com/hashicorp/cronexpr v1.1.1
Expand Down
79 changes: 78 additions & 1 deletion nomad/service_registration_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package nomad

import (
"net/http"
"sort"
"strconv"
"strings"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -423,6 +426,16 @@ func (s *ServiceRegistration) GetService(
http.StatusBadRequest, "failed to read result page: %v", err)
}

// Select which subset and the order of services to return if using ?choose
if args.Choose != "" {
chosen, chooseErr := s.choose(services, args.Choose)
if chooseErr != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to choose services: %v", chooseErr)
}
services = chosen
}

// Populate the reply.
reply.Services = services
reply.NextToken = nextToken
Expand All @@ -434,6 +447,70 @@ func (s *ServiceRegistration) GetService(
})
}

// choose uses rendezvous hashing to make a stable selection of a subset of services
// to return.
//
// parameter must in the form "<number>|<key>", where number is the number of services
// to select, and key is incorporated in the hashing function with each service -
// creating a unique yet consistent priority distribution pertaining to the requester.
// In practice (i.e. via consul-template), the key is the AllocID generating a request
// for upstream services.
//
// https://en.wikipedia.org/wiki/Rendezvous_hashing
// w := priority (i.e. hash value)
// h := hash function
// O := object - (i.e. requesting service - using key (allocID) as a proxy)
// S := site (i.e. destination service)
func (*ServiceRegistration) choose(services []*structs.ServiceRegistration, parameter string) ([]*structs.ServiceRegistration, error) {
// extract the number of services
tokens := strings.SplitN(parameter, "|", 2)
if len(tokens) != 2 {
return nil, structs.ErrMalformedChooseParameter
}
n, err := strconv.Atoi(tokens[0])
if err != nil {
return nil, structs.ErrMalformedChooseParameter
}

// extract the hash key
key := tokens[1]
if key == "" {
return nil, structs.ErrMalformedChooseParameter
}

// if there are fewer services than requested, go with the number of services
if l := len(services); l < n {
n = l
}

type pair struct {
hash string
service *structs.ServiceRegistration
}

// associate hash for each service
priorities := make([]*pair, len(services))
for i, service := range services {
priorities[i] = &pair{
hash: service.HashWith(key),
service: service,
}
}

// sort by the hash; creating random distribution of priority
sort.SliceStable(priorities, func(i, j int) bool {
return priorities[i].hash < priorities[j].hash
})

// choose top n services
chosen := make([]*structs.ServiceRegistration, n)
for i := 0; i < n; i++ {
chosen[i] = priorities[i].service
}

return chosen, nil
}

// handleMixedAuthEndpoint is a helper to handle auth on RPC endpoints that can
// either be called by Nomad nodes, or by external clients.
func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions, cap string) error {
Expand All @@ -451,7 +528,7 @@ func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions,
}
}
default:
// In the event we got any error other than notfound, consider this
// In the event we got any error other than ErrTokenNotFound, consider this
// terminal.
if err != structs.ErrTokenNotFound {
return err
Expand Down
154 changes: 154 additions & 0 deletions nomad/service_registration_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,84 @@ func TestServiceRegistration_GetService(t *testing.T) {
},
name: "filtering and pagination",
},
{
name: "choose 2",
serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) {
server, cleanup := TestServer(t, nil)
return server, nil, cleanup
},
testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)

// insert 3 instances of service s1
nodeID, jobID, allocID := "node_id", "job_id", "alloc_id"
services := []*structs.ServiceRegistration{
{
ID: "id_1",
Namespace: "default",
ServiceName: "s1",
NodeID: nodeID,
Datacenter: "dc1",
JobID: jobID,
AllocID: allocID,
Tags: []string{"tag1"},
Address: "10.0.0.1",
Port: 9001,
CreateIndex: 101,
ModifyIndex: 201,
},
{
ID: "id_2",
Namespace: "default",
ServiceName: "s1",
NodeID: nodeID,
Datacenter: "dc1",
JobID: jobID,
AllocID: allocID,
Tags: []string{"tag2"},
Address: "10.0.0.2",
Port: 9002,
CreateIndex: 102,
ModifyIndex: 202,
},
{
ID: "id_3",
Namespace: "default",
ServiceName: "s1",
NodeID: nodeID,
Datacenter: "dc1",
JobID: jobID,
AllocID: allocID,
Tags: []string{"tag3"},
Address: "10.0.0.3",
Port: 9003,
CreateIndex: 103,
ModifyIndex: 103,
},
}
require.NoError(t, s.fsm.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services))

serviceRegReq := &structs.ServiceRegistrationByNameRequest{
ServiceName: "s1",
Choose: "2|abc123", // select 2 in consistent order
QueryOptions: structs.QueryOptions{
Namespace: structs.DefaultNamespace,
Region: DefaultRegion,
},
}
var serviceRegResp structs.ServiceRegistrationByNameResponse
err := msgpackrpc.CallWithCodec(
codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq, &serviceRegResp)
require.NoError(t, err)

result := serviceRegResp.Services

require.Len(t, result, 2)
require.Equal(t, "10.0.0.3", result[0].Address)
require.Equal(t, "10.0.0.2", result[1].Address)
},
},
}

for _, tc := range testCases {
Expand All @@ -1184,3 +1262,79 @@ func TestServiceRegistration_GetService(t *testing.T) {
})
}
}

func TestServiceRegistration_chooseErr(t *testing.T) {
ci.Parallel(t)

sr := (*ServiceRegistration)(nil)
try := func(input []*structs.ServiceRegistration, parameter string) {
result, err := sr.choose(input, parameter)
require.Empty(t, result)
require.ErrorIs(t, err, structs.ErrMalformedChooseParameter)
}

regs := []*structs.ServiceRegistration{
{ID: "abc001", ServiceName: "s1"},
{ID: "abc002", ServiceName: "s2"},
{ID: "abc003", ServiceName: "s3"},
}

try(regs, "")
try(regs, "1|")
try(regs, "|abc")
try(regs, "a|abc")
}

func TestServiceRegistration_choose(t *testing.T) {
ci.Parallel(t)

sr := (*ServiceRegistration)(nil)
try := func(input, exp []*structs.ServiceRegistration, parameter string) {
result, err := sr.choose(input, parameter)
require.NoError(t, err)
require.Equal(t, exp, result)
}

// zero services
try(nil, []*structs.ServiceRegistration{}, "1|aaa")
try(nil, []*structs.ServiceRegistration{}, "2|aaa")

// some unique services
regs := []*structs.ServiceRegistration{
{ID: "abc001", ServiceName: "s1"},
{ID: "abc002", ServiceName: "s1"},
{ID: "abc003", ServiceName: "s1"},
}

// same key, increasing n -> maintains order (n=1)
try(regs, []*structs.ServiceRegistration{
{ID: "abc002", ServiceName: "s1"},
}, "1|aaa")

// same key, increasing n -> maintains order (n=2)
try(regs, []*structs.ServiceRegistration{
{ID: "abc002", ServiceName: "s1"},
{ID: "abc003", ServiceName: "s1"},
}, "2|aaa")

// same key, increasing n -> maintains order (n=3)
try(regs, []*structs.ServiceRegistration{
{ID: "abc002", ServiceName: "s1"},
{ID: "abc003", ServiceName: "s1"},
{ID: "abc001", ServiceName: "s1"},
}, "3|aaa")

// unique key -> different orders
try(regs, []*structs.ServiceRegistration{
{ID: "abc001", ServiceName: "s1"},
{ID: "abc002", ServiceName: "s1"},
{ID: "abc003", ServiceName: "s1"},
}, "3|bbb")

// another key -> another order
try(regs, []*structs.ServiceRegistration{
{ID: "abc002", ServiceName: "s1"},
{ID: "abc003", ServiceName: "s1"},
{ID: "abc001", ServiceName: "s1"},
}, "3|ccc")
}
2 changes: 2 additions & 0 deletions nomad/structs/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later"
errMissingAllocID = "Missing allocation ID"
errIncompatibleFiltering = "Filter expression cannot be used with other filter parameters"
errMalformedChooseParameter = "Parameter for choose must be in form '<number>|<key>'"

// Prefix based errors that are used to check if the error is of a given
// type. These errors should be created with the associated constructor.
Expand Down Expand Up @@ -55,6 +56,7 @@ var (
ErrNodeLacksRpc = errors.New(errNodeLacksRpc)
ErrMissingAllocID = errors.New(errMissingAllocID)
ErrIncompatibleFiltering = errors.New(errIncompatibleFiltering)
ErrMalformedChooseParameter = errors.New(errMalformedChooseParameter)

ErrUnknownNode = errors.New(ErrUnknownNodePrefix)

Expand Down
Loading

0 comments on commit 0fe0b54

Please sign in to comment.