From 42e5b5bb591efcf43acde5699ac5d37af7a357b4 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Tue, 10 Nov 2020 21:14:04 +0000 Subject: [PATCH] fix #9227: use both job and type query on scaling policy list endpoint --- CHANGELOG.md | 3 +- nomad/scaling_endpoint.go | 2 +- nomad/scaling_endpoint_test.go | 98 +++++++++++++++++++++++++++------- nomad/state/state_store.go | 22 +++++++- 4 files changed, 103 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7270ead4496..968a92a8cda0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,9 +38,10 @@ __BACKWARDS INCOMPATIBILITIES:__ BUG FIXES: - * core: Fixed a bug where blocking queries would not include the query's maximum wait time when calculating whether it was safe to retry. [[GH-8921](https://github.com/hashicorp/nomad/issues/8921)] * core: Fixed a bug where ACL handling prevented cross-namespace allocation listing [[GH-9278](https://github.com/hashicorp/nomad/issues/9278)] + * core: Fixed a bug where scaling policy filtering would ignore type query if job query was present [[GH-9312](https://github.com/hashicorp/nomad/issues/9312)] * core: Fixed a bug where a request to scale a job would fail if the job was not in the default namespace. [[GH-9296](https://github.com/hashicorp/nomad/pull/9296)] + * core: Fixed a bug where blocking queries would not include the query's maximum wait time when calculating whether it was safe to retry. [[GH-8921](https://github.com/hashicorp/nomad/issues/8921)] * config (Enterprise): Fixed default enterprise config merging. [[GH-9083](https://github.com/hashicorp/nomad/pull/9083)] * client: Fixed an issue with the Java fingerprinter on macOS causing pop-up notifications when no JVM installed. [[GH-9225](https://github.com/hashicorp/nomad/pull/9225)] * client: Fixed an fingerprinter issue detecting bridge kernel module [[GH-9299](https://github.com/hashicorp/nomad/pull/9299)] diff --git a/nomad/scaling_endpoint.go b/nomad/scaling_endpoint.go index 98da2ac53985..c3363e45a92b 100644 --- a/nomad/scaling_endpoint.go +++ b/nomad/scaling_endpoint.go @@ -54,7 +54,7 @@ func (p *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, reply *st if prefix := args.QueryOptions.Prefix; prefix != "" { iter, err = state.ScalingPoliciesByIDPrefix(ws, args.RequestNamespace(), prefix) } else if job := args.Job; job != "" { - iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job) + iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job, args.Type) } else { iter, err = state.ScalingPoliciesByNamespace(ws, args.Namespace, args.Type) } diff --git a/nomad/scaling_endpoint_test.go b/nomad/scaling_endpoint_test.go index 8dc2b7a29a84..a4f83b1f3390 100644 --- a/nomad/scaling_endpoint_test.go +++ b/nomad/scaling_endpoint_test.go @@ -1,11 +1,12 @@ package nomad import ( - msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/stretchr/testify/require" "testing" "time" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -125,7 +126,6 @@ func TestScalingEndpoint_GetPolicy_ACL(t *testing.T) { func TestScalingEndpoint_ListPolicies(t *testing.T) { t.Parallel() - require := require.New(t) s1, cleanupS1 := TestServer(t, nil) defer cleanupS1() @@ -133,25 +133,87 @@ func TestScalingEndpoint_ListPolicies(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) // Lookup the policies - get := &structs.ScalingPolicyListRequest{ + var resp structs.ScalingPolicyListResponse + err := msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", &structs.ScalingPolicyListRequest{ QueryOptions: structs.QueryOptions{ - Region: "global", + Region: "global", Namespace: "default", }, + }, &resp) + require.NoError(t, err) + require.Empty(t, resp.Policies) + + j1 := mock.Job() + j1polV := mock.ScalingPolicy() + j1polV.Type = "vertical-cpu" + j1polV.TargetTask(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0]) + j1polH := mock.ScalingPolicy() + j1polH.Type = "horizontal" + j1polH.TargetTaskGroup(j1, j1.TaskGroups[0]) + + j2 := mock.Job() + j2polH := mock.ScalingPolicy() + j2polH.Type = "horizontal" + j2polH.TargetTaskGroup(j2, j2.TaskGroups[0]) + + s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, j1) + s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, j2) + + pols := []*structs.ScalingPolicy{j1polV, j1polH, j2polH} + s1.fsm.State().UpsertScalingPolicies(1000, pols) + for _, p := range pols { + p.ModifyIndex = 1000 + p.CreateIndex = 1000 } - var resp structs.ACLPolicyListResponse - err := msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", get, &resp) - require.NoError(err) - require.Empty(resp.Policies) - p1 := mock.ScalingPolicy() - p2 := mock.ScalingPolicy() - s1.fsm.State().UpsertScalingPolicies(1000, []*structs.ScalingPolicy{p1, p2}) + cases := []struct { + Label string + Job string + Type string + Expected []*structs.ScalingPolicy + }{ + { + Label: "all policies", + Expected: []*structs.ScalingPolicy{j1polH, j1polV, j2polH}, + }, + { + Label: "job filter", + Job: j1.ID, + Expected: []*structs.ScalingPolicy{j1polH, j1polV}, + }, + { + Label: "type filter", + Type: "horizontal", + Expected: []*structs.ScalingPolicy{j1polH, j2polH}, + }, + { + Label: "job and type", + Job: j1.ID, + Type: "horizontal", + Expected: []*structs.ScalingPolicy{j1polH}, + }, + } - err = msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", get, &resp) - require.NoError(err) - require.EqualValues(1000, resp.Index) - require.Len(resp.Policies, 2) + for _, tc := range cases { + t.Run(tc.Label, func(t *testing.T) { + get := &structs.ScalingPolicyListRequest{ + Job: tc.Job, + Type: tc.Type, + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "default", + }, + } + var resp structs.ScalingPolicyListResponse + err = msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", get, &resp) + require.NoError(t, err) + stubs := []*structs.ScalingPolicyListStub{} + for _, p := range tc.Expected { + stubs = append(stubs, p.Stub()) + } + require.ElementsMatch(t, stubs, resp.Policies) + }) + } } func TestScalingEndpoint_ListPolicies_ACL(t *testing.T) { @@ -170,7 +232,7 @@ func TestScalingEndpoint_ListPolicies_ACL(t *testing.T) { get := &structs.ScalingPolicyListRequest{ QueryOptions: structs.QueryOptions{ - Region: "global", + Region: "global", Namespace: "default", }, } @@ -263,7 +325,7 @@ func TestScalingEndpoint_ListPolicies_Blocking(t *testing.T) { req := &structs.ScalingPolicyListRequest{ QueryOptions: structs.QueryOptions{ Region: "global", - Namespace: "default", + Namespace: "default", MinQueryIndex: 150, }, } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 32a873ebe83c..09fa5db9b87e 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -5760,9 +5760,27 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace, ty return iter, nil } -func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) { +func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID, policyType string) (memdb.ResultIterator, + error) { txn := s.db.ReadTxn() - return s.ScalingPoliciesByJobTxn(ws, namespace, jobID, txn) + iter, err := s.ScalingPoliciesByJobTxn(ws, namespace, jobID, txn) + if err != nil { + return nil, err + } + + if policyType == "" { + return iter, nil + } + + filter := func(raw interface{}) bool { + p, ok := raw.(*structs.ScalingPolicy) + if !ok { + return true + } + return policyType != p.Type + } + + return memdb.NewFilterIterator(iter, filter), nil } func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID string,