Skip to content

Commit

Permalink
api: enable selecting subset of services using rendezvous hashing
Browse files Browse the repository at this point in the history
This PR adds the 'choose' query parameter to the '/v1/service/<service>' endpoint.

The value of 'choose' is in the form '<number>|<key>', number is the number
of desired services and key is a value unique but consistent to the requester
(e.g. allocID).

Folks aren't really expected to use this API directly, but rather through consul-template
which will soon be getting a new helper function making use of this query parameter.

Example,

curl 'localhost:4646/v1/service/redis?choose=2|abc123'

Note: consul-templte v0.29.1 includes the necessary nomadServices functionality.
  • Loading branch information
shoenig committed Jun 25, 2022
1 parent f5b8506 commit 671193b
Show file tree
Hide file tree
Showing 12 changed files with 470 additions and 31 deletions.
3 changes: 3 additions & 0 deletions .changelog/12862.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
api: enable setting ?choose parameter when querying services
```
4 changes: 2 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"time"

"github.com/gorilla/websocket"
cleanhttp "github.com/hashicorp/go-cleanhttp"
rootcerts "github.com/hashicorp/go-rootcerts"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-rootcerts"
)

var (
Expand Down
26 changes: 8 additions & 18 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -119,31 +118,22 @@ func TestRequestTime(t *testing.T) {

func TestDefaultConfig_env(t *testing.T) {
testutil.Parallel(t)
url := "http://1.2.3.4:5678"
testURL := "http://1.2.3.4:5678"
auth := []string{"nomaduser", "12345"}
region := "test"
namespace := "dev"
token := "foobar"

os.Setenv("NOMAD_ADDR", url)
defer os.Setenv("NOMAD_ADDR", "")

os.Setenv("NOMAD_REGION", region)
defer os.Setenv("NOMAD_REGION", "")

os.Setenv("NOMAD_NAMESPACE", namespace)
defer os.Setenv("NOMAD_NAMESPACE", "")

os.Setenv("NOMAD_HTTP_AUTH", strings.Join(auth, ":"))
defer os.Setenv("NOMAD_HTTP_AUTH", "")

os.Setenv("NOMAD_TOKEN", token)
defer os.Setenv("NOMAD_TOKEN", "")
t.Setenv("NOMAD_ADDR", testURL)
t.Setenv("NOMAD_REGION", region)
t.Setenv("NOMAD_NAMESPACE", namespace)
t.Setenv("NOMAD_HTTP_AUTH", strings.Join(auth, ":"))
t.Setenv("NOMAD_TOKEN", token)

config := DefaultConfig()

if config.Address != url {
t.Errorf("expected %q to be %q", config.Address, url)
if config.Address != testURL {
t.Errorf("expected %q to be %q", config.Address, testURL)
}

if config.Region != region {
Expand Down
5 changes: 4 additions & 1 deletion command/agent/service_registration_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func (s *HTTPServer) ServiceRegistrationRequest(resp http.ResponseWriter, req *h
func (s *HTTPServer) serviceGetRequest(
resp http.ResponseWriter, req *http.Request, serviceName string) (interface{}, error) {

args := structs.ServiceRegistrationByNameRequest{ServiceName: serviceName}
args := structs.ServiceRegistrationByNameRequest{
ServiceName: serviceName,
Choose: req.URL.Query().Get("choose"),
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
Expand Down
105 changes: 98 additions & 7 deletions command/agent/service_registration_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -157,6 +158,7 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
name string
}{
{
name: "delete by ID",
testFn: func(s *TestAgent) {

// Grab the state, so we can manipulate it and test against it.
Expand Down Expand Up @@ -186,9 +188,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Nil(t, out)
require.NoError(t, err)
},
name: "delete by ID",
},
{
name: "get service by name",
testFn: func(s *TestAgent) {

// Grab the state, so we can manipulate it and test against it.
Expand All @@ -214,9 +216,99 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.NotZero(t, respW.Header().Get("X-Nomad-Index"))
require.Equal(t, serviceReg, obj.([]*structs.ServiceRegistration)[0])
},
name: "get service by name",
},
{
name: "get service using choose",
testFn: func(s *TestAgent) {
// Grab the state so we can manipulate and test against it.
testState := s.Agent.server.State()

err := testState.UpsertServiceRegistrations(
structs.MsgTypeTestSetup, 10,
[]*structs.ServiceRegistration{{
ID: "978d519a-46ad-fb04-966b-000000000001",
ServiceName: "redis",
Namespace: "default",
NodeID: "node1",
Datacenter: "dc1",
JobID: "job1",
AllocID: "8b83191f-cb29-e23a-d955-220b65ef676d",
Tags: nil,
Address: "10.0.0.1",
Port: 8080,
CreateIndex: 10,
ModifyIndex: 10,
}, {
ID: "978d519a-46ad-fb04-966b-000000000002",
ServiceName: "redis",
Namespace: "default",
NodeID: "node2",
Datacenter: "dc1",
JobID: "job1",
AllocID: "df6de93c-9376-a774-bcdf-3bd817e18078",
Tags: nil,
Address: "10.0.0.2",
Port: 8080,
CreateIndex: 10,
ModifyIndex: 10,
}, {
ID: "978d519a-46ad-fb04-966b-000000000003",
ServiceName: "redis",
Namespace: "default",
NodeID: "node3",
Datacenter: "dc1",
JobID: "job1",
AllocID: "df6de93c-9376-a774-bcdf-3bd817e18078",
Tags: nil,
Address: "10.0.0.3",
Port: 8080,
CreateIndex: 10,
ModifyIndex: 10,
}},
)
must.NoError(t, err)

// Build the HTTP request for 1 instance of the service, using key=abc123
req, err := http.NewRequest(http.MethodGet, "/v1/service/redis?choose=1|abc123", nil)
must.NoError(t, err)
respW := httptest.NewRecorder()

// Send the HTTP request.
obj, err := s.Server.ServiceRegistrationRequest(respW, req)
must.NoError(t, err)

// Check we got the correct type back.
services, ok := (obj).([]*structs.ServiceRegistration)
must.True(t, ok)

// Check we got the expected number of services back.
must.Len(t, 1, services)

// Build the HTTP request for 2 instances of the service, still using key=abc123
req2, err := http.NewRequest(http.MethodGet, "/v1/service/redis?choose=2|abc123", nil)
must.NoError(t, err)
respW2 := httptest.NewRecorder()

// Send the 2nd HTTP request.
obj2, err := s.Server.ServiceRegistrationRequest(respW2, req2)
must.NoError(t, err)

// Check we got the correct type back.
services2, ok := (obj2).([]*structs.ServiceRegistration)
must.True(t, ok)

// Check we got the expected number of services back.
must.Len(t, 2, services2)

// Check the first service is the same as the previous service.
must.Eq(t, services[0], services2[0])

// Check the second service is not the same as the first service.
must.NotEq(t, services2[0], services2[1])
},
},
{
name: "incorrect URI format",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -230,9 +322,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "invalid URI")
require.Nil(t, obj)
},
name: "incorrect URI format",
},
{
name: "get service empty name",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -246,9 +338,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "missing service name")
require.Nil(t, obj)
},
name: "get service empty name",
},
{
name: "get service incorrect method",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -262,9 +354,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "Invalid method")
require.Nil(t, obj)
},
name: "get service incorrect method",
},
{
name: "delete service empty id",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -278,9 +370,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "missing service id")
require.Nil(t, obj)
},
name: "delete service empty id",
},
{
name: "delete service incorrect method",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -294,7 +386,6 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "Invalid method")
require.Nil(t, obj)
},
name: "delete service incorrect method",
},
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/gosuri/uilive v0.0.4
github.com/grpc-ecosystem/go-grpc-middleware v1.2.1-0.20200228141219-3ce3d519df39
github.com/hashicorp/consul v1.7.8
github.com/hashicorp/consul-template v0.29.0
github.com/hashicorp/consul-template v0.29.1
github.com/hashicorp/consul/api v1.13.0
github.com/hashicorp/consul/sdk v0.8.0
github.com/hashicorp/cronexpr v1.1.1
Expand Down
79 changes: 78 additions & 1 deletion nomad/service_registration_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package nomad

import (
"net/http"
"sort"
"strconv"
"strings"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -423,6 +426,16 @@ func (s *ServiceRegistration) GetService(
http.StatusBadRequest, "failed to read result page: %v", err)
}

// Select which subset and the order of services to return if using ?choose
if args.Choose != "" {
chosen, chooseErr := s.choose(services, args.Choose)
if chooseErr != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to choose services: %v", chooseErr)
}
services = chosen
}

// Populate the reply.
reply.Services = services
reply.NextToken = nextToken
Expand All @@ -434,6 +447,70 @@ func (s *ServiceRegistration) GetService(
})
}

// choose uses rendezvous hashing to make a stable selection of a subset of services
// to return.
//
// parameter must in the form "<number>|<key>", where number is the number of services
// to select, and key is incorporated in the hashing function with each service -
// creating a unique yet consistent priority distribution pertaining to the requester.
// In practice (i.e. via consul-template), the key is the AllocID generating a request
// for upstream services.
//
// https://en.wikipedia.org/wiki/Rendezvous_hashing
// w := priority (i.e. hash value)
// h := hash function
// O := object - (i.e. requesting service - using key (allocID) as a proxy)
// S := site (i.e. destination service)
func (*ServiceRegistration) choose(services []*structs.ServiceRegistration, parameter string) ([]*structs.ServiceRegistration, error) {
// extract the number of services
tokens := strings.SplitN(parameter, "|", 2)
if len(tokens) != 2 {
return nil, structs.ErrMalformedChooseParameter
}
n, err := strconv.Atoi(tokens[0])
if err != nil {
return nil, structs.ErrMalformedChooseParameter
}

// extract the hash key
key := tokens[1]
if key == "" {
return nil, structs.ErrMalformedChooseParameter
}

// if there are fewer services than requested, go with the number of services
if l := len(services); l < n {
n = l
}

type pair struct {
hash string
service *structs.ServiceRegistration
}

// associate hash for each service
priorities := make([]*pair, len(services))
for i, service := range services {
priorities[i] = &pair{
hash: service.HashWith(key),
service: service,
}
}

// sort by the hash; creating random distribution of priority
sort.SliceStable(priorities, func(i, j int) bool {
return priorities[i].hash < priorities[j].hash
})

// choose top n services
chosen := make([]*structs.ServiceRegistration, n)
for i := 0; i < n; i++ {
chosen[i] = priorities[i].service
}

return chosen, nil
}

// handleMixedAuthEndpoint is a helper to handle auth on RPC endpoints that can
// either be called by Nomad nodes, or by external clients.
func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions, cap string) error {
Expand All @@ -451,7 +528,7 @@ func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions,
}
}
default:
// In the event we got any error other than notfound, consider this
// In the event we got any error other than ErrTokenNotFound, consider this
// terminal.
if err != structs.ErrTokenNotFound {
return err
Expand Down
Loading

0 comments on commit 671193b

Please sign in to comment.