Skip to content

Commit

Permalink
state_store/fix the prefix bugs for scaling policies documented in 1a…
Browse files Browse the repository at this point in the history
…9318
  • Loading branch information
cgbaker committed Aug 27, 2020
1 parent 1a9318a commit 78e1add
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ IMPROVEMENTS:
BUG FIXES:

* core: Fixed a bug where unpromoted job versions are used when rescheduling failed allocations [[GH-8691](https://github.com/hashicorp/nomad/issues/8691)]
* core: Fixed bugs where scaling policies could be matched against incorrect jobs with a similar prefix [[GH-8753](https://github.com/hashicorp/nomad/issues/8753)]

## 0.12.3 (August 13, 2020)

Expand Down
53 changes: 48 additions & 5 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1539,11 +1539,30 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn

// deleteJobScalingPolicies deletes any scaling policies associated with the job
func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error {
numDeletedScalingPolicies, err := txn.DeleteAll("scaling_policy", "target_prefix", job.Namespace, job.ID)
iter, err := s.ScalingPoliciesByJobTxn(nil, job.Namespace, job.ID, txn)
if err != nil {
return fmt.Errorf("deleting job scaling policies failed: %v", err)
return fmt.Errorf("getting job scaling policies for deletion failed: %v", err)
}

// Put them into a slice so there are no safety concerns while actually
// performing the deletes
policies := []interface{}{}
for {
raw := iter.Next()
if raw == nil {
break
}
policies = append(policies, raw)
}

// Do the deletes
for _, p := range policies {
if err := txn.Delete("scaling_policy", p); err != nil {
return fmt.Errorf("deleting scaling policy failed: %v", err)
}
}
if numDeletedScalingPolicies > 0 {

if len(policies) > 0 {
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
Expand Down Expand Up @@ -5332,7 +5351,19 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str
}

ws.Add(iter.WatchCh())
return iter, nil

filter := func(raw interface{}) bool {
d, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}

return d.Target[structs.ScalingTargetNamespace] != namespace
}

// Wrap the iterator in a filter
wrap := memdb.NewFilterIterator(iter, filter)
return wrap, nil
}

func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) {
Expand All @@ -5349,7 +5380,19 @@ func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID
}

ws.Add(iter.WatchCh())
return iter, nil

filter := func(raw interface{}) bool {
d, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}

return d.Target[structs.ScalingTargetJob] != jobID
}

// Wrap the iterator in a filter
wrap := memdb.NewFilterIterator(iter, filter)
return wrap, nil
}

func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.ScalingPolicy, error) {
Expand Down

0 comments on commit 78e1add

Please sign in to comment.