Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolve prefix bugs around job scaling policies #8753

Merged
merged 2 commits into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ IMPROVEMENTS:
BUG FIXES:

* core: Fixed a bug where unpromoted job versions are used when rescheduling failed allocations [[GH-8691](https://github.com/hashicorp/nomad/issues/8691)]
* core: Fixed bugs where scaling policies could be matched against incorrect jobs with a similar prefix [[GH-8753](https://github.com/hashicorp/nomad/issues/8753)]

## 0.12.3 (August 13, 2020)

Expand Down
3 changes: 3 additions & 0 deletions nomad/scaling_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func TestScalingEndpoint_ListPolicies(t *testing.T) {
get := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
},
}
var resp structs.ACLPolicyListResponse
Expand Down Expand Up @@ -170,6 +171,7 @@ func TestScalingEndpoint_ListPolicies_ACL(t *testing.T) {
get := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
},
}

Expand Down Expand Up @@ -261,6 +263,7 @@ func TestScalingEndpoint_ListPolicies_Blocking(t *testing.T) {
req := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
MinQueryIndex: 150,
},
}
Expand Down
53 changes: 48 additions & 5 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1539,11 +1539,30 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn

// deleteJobScalingPolicies deletes any scaling policies associated with the job
func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error {
numDeletedScalingPolicies, err := txn.DeleteAll("scaling_policy", "target_prefix", job.Namespace, job.ID)
iter, err := s.ScalingPoliciesByJobTxn(nil, job.Namespace, job.ID, txn)
if err != nil {
return fmt.Errorf("deleting job scaling policies failed: %v", err)
return fmt.Errorf("getting job scaling policies for deletion failed: %v", err)
}

// Put them into a slice so there are no safety concerns while actually
// performing the deletes
policies := []interface{}{}
for {
raw := iter.Next()
if raw == nil {
break
}
policies = append(policies, raw)
}

// Do the deletes
for _, p := range policies {
if err := txn.Delete("scaling_policy", p); err != nil {
return fmt.Errorf("deleting scaling policy failed: %v", err)
}
}
if numDeletedScalingPolicies > 0 {

if len(policies) > 0 {
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
Expand Down Expand Up @@ -5332,7 +5351,19 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str
}

ws.Add(iter.WatchCh())
return iter, nil

filter := func(raw interface{}) bool {
d, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}

return d.Target[structs.ScalingTargetNamespace] != namespace
}

// Wrap the iterator in a filter
wrap := memdb.NewFilterIterator(iter, filter)
return wrap, nil
}

func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) {
Expand All @@ -5349,7 +5380,19 @@ func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID
}

ws.Add(iter.WatchCh())
return iter, nil

filter := func(raw interface{}) bool {
d, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}

return d.Target[structs.ScalingTargetJob] != jobID
}

// Wrap the iterator in a filter
wrap := memdb.NewFilterIterator(iter, filter)
return wrap, nil
}

func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.ScalingPolicy, error) {
Expand Down
123 changes: 123 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8640,6 +8640,59 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) {
require.ElementsMatch([]string{policy2.ID}, policiesInOtherNamespace)
}

func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) {
t.Parallel()
require := require.New(t)

ns1 := "name"
ns2 := "name2" // matches prefix "name"
state := testStateStore(t)
policy1 := mock.ScalingPolicy()
policy1.Target[structs.ScalingTargetNamespace] = ns1
policy2 := mock.ScalingPolicy()
policy2.Target[structs.ScalingTargetNamespace] = ns2

ws1 := memdb.NewWatchSet()
iter, err := state.ScalingPoliciesByNamespace(ws1, ns1)
require.NoError(err)
require.Nil(iter.Next())

ws2 := memdb.NewWatchSet()
iter, err = state.ScalingPoliciesByNamespace(ws2, ns2)
require.NoError(err)
require.Nil(iter.Next())

err = state.UpsertScalingPolicies(1000, []*structs.ScalingPolicy{policy1, policy2})
require.NoError(err)
require.True(watchFired(ws1))
require.True(watchFired(ws2))

iter, err = state.ScalingPoliciesByNamespace(nil, ns1)
require.NoError(err)
policiesInNS1 := []string{}
for {
raw := iter.Next()
if raw == nil {
break
}
policiesInNS1 = append(policiesInNS1, raw.(*structs.ScalingPolicy).ID)
}
require.ElementsMatch([]string{policy1.ID}, policiesInNS1)

iter, err = state.ScalingPoliciesByNamespace(nil, ns2)
require.NoError(err)
policiesInNS2 := []string{}
for {
raw := iter.Next()
if raw == nil {
break
}
policiesInNS2 = append(policiesInNS2, raw.(*structs.ScalingPolicy).ID)
}
require.ElementsMatch([]string{policy2.ID}, policiesInNS2)
}


func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -8940,6 +8993,37 @@ func TestStateStore_DeleteJob_DeleteScalingPolicies(t *testing.T) {
require.True(index > 1001)
}

func TestStateStore_DeleteJob_DeleteScalingPoliciesPrefixBug(t *testing.T) {
t.Parallel()

require := require.New(t)

state := testStateStore(t)

job := mock.Job()
require.NoError(state.UpsertJob(1000, job))
job2 := job.Copy()
job2.ID = job.ID + "-but-longer"
require.NoError(state.UpsertJob(1001, job2))

policy := mock.ScalingPolicy()
policy.Target[structs.ScalingTargetJob] = job.ID
policy2 := mock.ScalingPolicy()
policy2.Target[structs.ScalingTargetJob] = job2.ID
require.NoError(state.UpsertScalingPolicies(1002, []*structs.ScalingPolicy{policy, policy2}))

// Delete job with the shorter prefix-ID
require.NoError(state.DeleteJob(1003, job.Namespace, job.ID))

// Ensure only the associated scaling policy was deleted, not the one matching the job with the longer ID
out, err := state.ScalingPolicyByID(nil, policy.ID)
require.NoError(err)
require.Nil(out)
out, err = state.ScalingPolicyByID(nil, policy2.ID)
require.NoError(err)
require.NotNil(out)
}

// This test ensures that deleting a job that doesn't have any scaling policies
// will not cause the scaling_policy table index to increase, on either job
// registration or deletion.
Expand Down Expand Up @@ -9035,6 +9119,45 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) {
require.Equal(expect, found)
}

func TestStateStore_ScalingPoliciesByJob_PrefixBug(t *testing.T) {
t.Parallel()

require := require.New(t)

jobPrefix := "job-name-" + uuid.Generate()

state := testStateStore(t)
policy1 := mock.ScalingPolicy()
policy1.Target[structs.ScalingTargetJob] = jobPrefix
policy2 := mock.ScalingPolicy()
policy2.Target[structs.ScalingTargetJob] = jobPrefix + "-more"

// Create the policies
var baseIndex uint64 = 1000
err := state.UpsertScalingPolicies(baseIndex, []*structs.ScalingPolicy{policy1, policy2})
require.NoError(err)

iter, err := state.ScalingPoliciesByJob(nil,
policy1.Target[structs.ScalingTargetNamespace],
jobPrefix)
require.NoError(err)

// Ensure we see expected policies
count := 0
found := []string{}
for {
raw := iter.Next()
if raw == nil {
break
}
count++
found = append(found, raw.(*structs.ScalingPolicy).ID)
}
require.Equal(1, count)
expect := []string{policy1.ID}
require.Equal(expect, found)
}

func TestStateStore_UpsertScalingEvent(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down