From 6f5610c9c48ca9a2f2432e055fb747e748e83d0f Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Tue, 21 Apr 2020 23:01:26 +0000 Subject: [PATCH] modify state store so that autoscaling policies are deleted from their table as job is stopped (and recreated when job is started) --- nomad/state/state_store.go | 31 ++++++++--- nomad/state/state_store_test.go | 91 ++++++++++++++++++++++++++++++++- 2 files changed, 113 insertions(+), 9 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 8e6e644b0b94..0ff24b381670 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1478,16 +1478,10 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn return fmt.Errorf("index update failed: %v", err) } - // Delete any job scaling policies - numDeletedScalingPolicies, err := txn.DeleteAll("scaling_policy", "target_prefix", namespace, jobID) - if err != nil { + // Delete any remaining job scaling policies + if err := s.deleteJobScalingPolicies(index, job, txn); err != nil { return fmt.Errorf("deleting job scaling policies failed: %v", err) } - if numDeletedScalingPolicies > 0 { - if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) - } - } // Delete the scaling events if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil { @@ -1506,6 +1500,20 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn return nil } +// deleteJobScalingPolicies deletes any scaling policies associated with the job +func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error { + numDeletedScalingPolicies, err := txn.DeleteAll("scaling_policy", "target_prefix", job.Namespace, job.ID) + if err != nil { + return fmt.Errorf("deleting job scaling policies failed: %v", err) + } + if numDeletedScalingPolicies > 0 { + if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + } + return nil +} + // deleteJobVersions deletes all versions of the given job. func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memdb.Txn) error { iter, err := txn.Get("job_version", "id_prefix", job.Namespace, job.ID) @@ -4247,6 +4255,13 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx ws := memdb.NewWatchSet() + if job.Stop { + if err := s.deleteJobScalingPolicies(index, job, txn); err != nil { + return fmt.Errorf("deleting job scaling policies failed: %v", err) + } + return nil + } + scalingPolicies := job.GetScalingPolicies() newTargets := map[string]struct{}{} for _, p := range scalingPolicies { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 6ec8cc67221e..2a16fec7318d 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -8427,7 +8427,96 @@ func TestStateStore_DeleteScalingPolicies(t *testing.T) { require.False(watchFired(ws)) } -func TestStateStore_DeleteJob_ChildScalingPolicies(t *testing.T) { +func TestStateStore_StopJob_DeleteScalingPolicies(t *testing.T) { + t.Parallel() + + require := require.New(t) + + state := testStateStore(t) + + job := mock.Job() + + err := state.UpsertJob(1000, job) + require.NoError(err) + + policy := mock.ScalingPolicy() + policy.Target[structs.ScalingTargetJob] = job.ID + err = state.UpsertScalingPolicies(1100, []*structs.ScalingPolicy{policy}) + require.NoError(err) + + // Ensure the scaling policy is present and start some watches + wsGet := memdb.NewWatchSet() + out, err := state.ScalingPolicyByTarget(wsGet, policy.Target) + require.NoError(err) + require.NotNil(out) + wsList := memdb.NewWatchSet() + _, err = state.ScalingPolicies(wsList) + require.NoError(err) + + // Stop the job + job, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(err) + job.Stop = true + err = state.UpsertJob(1200, job) + require.NoError(err) + + // Ensure: + // * the scaling policy was deleted + // * the watches were fired + // * the table index was advanced + require.True(watchFired(wsGet)) + require.True(watchFired(wsList)) + out, err = state.ScalingPolicyByTarget(nil, policy.Target) + require.NoError(err) + require.Nil(out) + index, err := state.Index("scaling_policy") + require.GreaterOrEqual(index, uint64(1200)) +} + +func TestStateStore_UnstopJob_UpsertScalingPolicies(t *testing.T) { + t.Parallel() + + require := require.New(t) + + state := testStateStore(t) + + job, policy := mock.JobWithScalingPolicy() + job.Stop = true + + // establish watcher, verify there are no scaling policies yet + ws := memdb.NewWatchSet() + list, err := state.ScalingPolicies(ws) + require.NoError(err) + require.Nil(list.Next()) + + // upsert a stopped job, verify that we don't fire the watcher or add any scaling policies + err = state.UpsertJob(1000, job) + require.NoError(err) + require.False(watchFired(ws)) + // stopped job should have no scaling policies, watcher doesn't fire + list, err = state.ScalingPolicies(ws) + require.NoError(err) + require.Nil(list.Next()) + + // Establish a new watcher + ws = memdb.NewWatchSet() + _, err = state.ScalingPolicies(ws) + require.NoError(err) + // Unstop this job, say you'll run it again... + job.Stop = false + err = state.UpsertJob(1100, job) + require.NoError(err) + + // Ensure the scaling policy was added, watch was fired, index was advanced + require.True(watchFired(ws)) + out, err := state.ScalingPolicyByTarget(nil, policy.Target) + require.NoError(err) + require.NotNil(out) + index, err := state.Index("scaling_policy") + require.GreaterOrEqual(index, uint64(1100)) +} + +func TestStateStore_DeleteJob_DeleteScalingPolicies(t *testing.T) { t.Parallel() require := require.New(t)