Skip to content

Commit

Permalink
Merge pull request #9312 from hashicorp/b-9227-scaling-policy-filtering
Browse files Browse the repository at this point in the history
use both job and type query on scaling policy list endpoint
  • Loading branch information
cgbaker committed Nov 11, 2020
2 parents 306cfab + d25df37 commit 801911f
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 25 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ __BACKWARDS INCOMPATIBILITIES:__
BUG FIXES:

* agent (Enterprise): Fixed a bug where audit logging caused websocket and streaming http endpoints to fail [[GH-9319](https://github.com/hashicorp/nomad/issues/9319)]
* core: Fixed a bug where blocking queries would not include the query's maximum wait time when calculating whether it was safe to retry. [[GH-8921](https://github.com/hashicorp/nomad/issues/8921)]
* core: Fixed a bug where ACL handling prevented cross-namespace allocation listing [[GH-9278](https://github.com/hashicorp/nomad/issues/9278)]
* core: Fixed a bug where scaling policy filtering would ignore type query if job query was present [[GH-9312](https://github.com/hashicorp/nomad/issues/9312)]
* core: Fixed a bug where a request to scale a job would fail if the job was not in the default namespace. [[GH-9296](https://github.com/hashicorp/nomad/pull/9296)]
* core: Fixed a bug where blocking queries would not include the query's maximum wait time when calculating whether it was safe to retry. [[GH-8921](https://github.com/hashicorp/nomad/issues/8921)]
* config (Enterprise): Fixed default enterprise config merging. [[GH-9083](https://github.com/hashicorp/nomad/pull/9083)]
* client: Fixed an issue with the Java fingerprinter on macOS causing pop-up notifications when no JVM installed. [[GH-9225](https://github.com/hashicorp/nomad/pull/9225)]
* client: Fixed an fingerprinter issue detecting bridge kernel module [[GH-9299](https://github.com/hashicorp/nomad/pull/9299)]
Expand Down
2 changes: 1 addition & 1 deletion nomad/scaling_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (p *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, reply *st
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.ScalingPoliciesByIDPrefix(ws, args.RequestNamespace(), prefix)
} else if job := args.Job; job != "" {
iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job)
iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job, args.Type)
} else {
iter, err = state.ScalingPoliciesByNamespace(ws, args.Namespace, args.Type)
}
Expand Down
98 changes: 80 additions & 18 deletions nomad/scaling_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package nomad

import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
"testing"
"time"

msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
Expand Down Expand Up @@ -125,33 +126,94 @@ func TestScalingEndpoint_GetPolicy_ACL(t *testing.T) {

func TestScalingEndpoint_ListPolicies(t *testing.T) {
t.Parallel()
require := require.New(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Lookup the policies
get := &structs.ScalingPolicyListRequest{
var resp structs.ScalingPolicyListResponse
err := msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Region: "global",
Namespace: "default",
},
}, &resp)
require.NoError(t, err)
require.Empty(t, resp.Policies)

j1 := mock.Job()
j1polV := mock.ScalingPolicy()
j1polV.Type = "vertical-cpu"
j1polV.TargetTask(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0])
j1polH := mock.ScalingPolicy()
j1polH.Type = "horizontal"
j1polH.TargetTaskGroup(j1, j1.TaskGroups[0])

j2 := mock.Job()
j2polH := mock.ScalingPolicy()
j2polH.Type = "horizontal"
j2polH.TargetTaskGroup(j2, j2.TaskGroups[0])

s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, j1)
s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, j2)

pols := []*structs.ScalingPolicy{j1polV, j1polH, j2polH}
s1.fsm.State().UpsertScalingPolicies(1000, pols)
for _, p := range pols {
p.ModifyIndex = 1000
p.CreateIndex = 1000
}
var resp structs.ACLPolicyListResponse
err := msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", get, &resp)
require.NoError(err)
require.Empty(resp.Policies)

p1 := mock.ScalingPolicy()
p2 := mock.ScalingPolicy()
s1.fsm.State().UpsertScalingPolicies(1000, []*structs.ScalingPolicy{p1, p2})
cases := []struct {
Label string
Job string
Type string
Expected []*structs.ScalingPolicy
}{
{
Label: "all policies",
Expected: []*structs.ScalingPolicy{j1polH, j1polV, j2polH},
},
{
Label: "job filter",
Job: j1.ID,
Expected: []*structs.ScalingPolicy{j1polH, j1polV},
},
{
Label: "type filter",
Type: "horizontal",
Expected: []*structs.ScalingPolicy{j1polH, j2polH},
},
{
Label: "job and type",
Job: j1.ID,
Type: "horizontal",
Expected: []*structs.ScalingPolicy{j1polH},
},
}

err = msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", get, &resp)
require.NoError(err)
require.EqualValues(1000, resp.Index)
require.Len(resp.Policies, 2)
for _, tc := range cases {
t.Run(tc.Label, func(t *testing.T) {
get := &structs.ScalingPolicyListRequest{
Job: tc.Job,
Type: tc.Type,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
},
}
var resp structs.ScalingPolicyListResponse
err = msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", get, &resp)
require.NoError(t, err)
stubs := []*structs.ScalingPolicyListStub{}
for _, p := range tc.Expected {
stubs = append(stubs, p.Stub())
}
require.ElementsMatch(t, stubs, resp.Policies)
})
}
}

func TestScalingEndpoint_ListPolicies_ACL(t *testing.T) {
Expand All @@ -170,7 +232,7 @@ func TestScalingEndpoint_ListPolicies_ACL(t *testing.T) {

get := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Region: "global",
Namespace: "default",
},
}
Expand Down Expand Up @@ -263,7 +325,7 @@ func TestScalingEndpoint_ListPolicies_Blocking(t *testing.T) {
req := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
Namespace: "default",
MinQueryIndex: 150,
},
}
Expand Down
22 changes: 20 additions & 2 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5760,9 +5760,27 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace, ty
return iter, nil
}

func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) {
func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID, policyType string) (memdb.ResultIterator,
error) {
txn := s.db.ReadTxn()
return s.ScalingPoliciesByJobTxn(ws, namespace, jobID, txn)
iter, err := s.ScalingPoliciesByJobTxn(ws, namespace, jobID, txn)
if err != nil {
return nil, err
}

if policyType == "" {
return iter, nil
}

filter := func(raw interface{}) bool {
p, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}
return policyType != p.Type
}

return memdb.NewFilterIterator(iter, filter), nil
}

func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID string,
Expand Down
6 changes: 3 additions & 3 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9201,7 +9201,7 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) {

iter, err := state.ScalingPoliciesByJob(nil,
policyA.Target[structs.ScalingTargetNamespace],
policyA.Target[structs.ScalingTargetJob])
policyA.Target[structs.ScalingTargetJob], "")
require.NoError(err)

// Ensure we see expected policies
Expand All @@ -9223,7 +9223,7 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) {

iter, err = state.ScalingPoliciesByJob(nil,
policyB1.Target[structs.ScalingTargetNamespace],
policyB1.Target[structs.ScalingTargetJob])
policyB1.Target[structs.ScalingTargetJob], "")
require.NoError(err)

// Ensure we see expected policies
Expand Down Expand Up @@ -9267,7 +9267,7 @@ func TestStateStore_ScalingPoliciesByJob_PrefixBug(t *testing.T) {

iter, err := state.ScalingPoliciesByJob(nil,
policy1.Target[structs.ScalingTargetNamespace],
jobPrefix)
jobPrefix, "")
require.NoError(err)

// Ensure we see expected policies
Expand Down

0 comments on commit 801911f

Please sign in to comment.