Skip to content

Commit

Permalink
Remove api understanding of job dependencies
Browse files Browse the repository at this point in the history
This should keep the api from needing to understand
that a job can have a parent or dependent jobs. This
slightly simplifies the functions and allows SetJob on
the store to have controll over job editing.
  • Loading branch information
sysadmind committed Apr 30, 2018
1 parent 80f2222 commit 88c072c
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 86 deletions.
21 changes: 1 addition & 20 deletions dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,27 +142,8 @@ func (h *HTTPTransport) jobCreateOrUpdateHandler(c *gin.Context) {
}
c.BindJSON(&job)

// Get if the requested job already exist
ej, err := h.agent.Store.GetJob(job.Name)
if err != nil && err != store.ErrKeyNotFound {
c.AbortWithError(422, err)
return
}

// If it's an existing job, lock it
if ej != nil {
ej.Lock()
defer ej.Unlock()
}

// Save the job to the store
if err = h.agent.Store.SetJob(&job, ej); err != nil {
c.AbortWithError(422, err)
return
}

// Save the job parent
if err = h.agent.Store.SetJobDependencyTree(&job, ej); err != nil {
if err := h.agent.Store.SetJob(&job, true); err != nil {
c.AbortWithError(422, err)
return
}
Expand Down
4 changes: 4 additions & 0 deletions dkron/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func setupAPITest(t *testing.T) (a *Agent) {
"-node-name", "test",
"-server",
"-log-level", logLevel,
"-keyspace", "dkron-test",
}

c := NewConfig(args)
Expand All @@ -33,6 +34,9 @@ func setupAPITest(t *testing.T) (a *Agent) {
}
time.Sleep(1 * time.Second)

// clean up the keyspace to ensure clean runs
a.Store.Client.DeleteTree("dkron-test")

return
}

Expand Down
16 changes: 4 additions & 12 deletions dkron/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestJobGetParent(t *testing.T) {

// Cleanup everything
err := store.Client.DeleteTree("dkron-test")
if err != s.ErrKeyNotFound {
if err != nil && err != s.ErrKeyNotFound {
t.Logf("error cleaning up: %s", err)
}

Expand All @@ -26,7 +26,7 @@ func TestJobGetParent(t *testing.T) {
Schedule: "@every 2s",
}

if err := store.SetJob(parentTestJob, nil); err != nil {
if err := store.SetJob(parentTestJob, true); err != nil {
t.Fatalf("error creating job: %s", err)
}

Expand All @@ -36,10 +36,7 @@ func TestJobGetParent(t *testing.T) {
ParentJob: "parent_test",
}

err = store.SetJob(dependentTestJob, nil)
assert.NoError(t, err)

err = store.SetJobDependencyTree(dependentTestJob, nil)
err = store.SetJob(dependentTestJob, true)
assert.NoError(t, err)

parentTestJob, err = dependentTestJob.GetParent()
Expand All @@ -51,14 +48,9 @@ func TestJobGetParent(t *testing.T) {
assert.Equal(t, parentTestJob, ptj)

// Remove the parent job
ej, _ := store.GetJob(dependentTestJob.Name)

dependentTestJob.ParentJob = ""
dependentTestJob.Schedule = "@every 2m"
err = store.SetJob(dependentTestJob, nil)
assert.NoError(t, err)

err = store.SetJobDependencyTree(dependentTestJob, ej)
err = store.SetJob(dependentTestJob, true)
assert.NoError(t, err)

dtj, _ := store.GetJob(dependentTestJob.Name)
Expand Down
2 changes: 1 addition & 1 deletion dkron/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestRPCExecutionDone(t *testing.T) {
Disabled: true,
}

if err := store.SetJob(testJob, nil); err != nil {
if err := store.SetJob(testJob, true); err != nil {
t.Fatalf("error creating job: %s", err)
}

Expand Down
97 changes: 45 additions & 52 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,9 @@ func NewStore(backend string, machines []string, a *Agent, keyspace string, conf
}

// Store a job
func (s *Store) SetJob(job *Job, previousJob *Job) error {
func (s *Store) SetJob(job *Job, copyDependentJobs bool) error {
//Existing job that has children, let's keep it's children
if previousJob != nil && len(previousJob.DependentJobs) != 0 {
job.DependentJobs = previousJob.DependentJobs
}

// Sanitize the job name
job.Name = generateSlug(job.Name)
jobKey := fmt.Sprintf("%s/jobs/%s", s.keyspace, job.Name)
Expand All @@ -88,6 +86,8 @@ func (s *Store) SetJob(job *Job, previousJob *Job) error {
return err
}
if ej != nil {
ej.Lock()
ej.Unlock()
// When the job runs, these status vars are updated
// otherwise use the ones that are stored
if ej.LastError.After(job.LastError) {
Expand All @@ -102,6 +102,9 @@ func (s *Store) SetJob(job *Job, previousJob *Job) error {
if ej.ErrorCount > job.ErrorCount {
job.ErrorCount = ej.ErrorCount
}
if len(ej.DependentJobs) != 0 && copyDependentJobs {
job.DependentJobs = ej.DependentJobs
}
}

jobJSON, _ := json.Marshal(job)
Expand All @@ -115,76 +118,66 @@ func (s *Store) SetJob(job *Job, previousJob *Job) error {
return err
}

return nil
}

func (s *Store) AtomicJobPut(job *Job, prevJobKVPair *store.KVPair) (bool, error) {
jobKey := fmt.Sprintf("%s/jobs/%s", s.keyspace, job.Name)
jobJSON, _ := json.Marshal(job)

ok, _, err := s.Client.AtomicPut(jobKey, jobJSON, prevJobKVPair, nil)

return ok, err
}
if ej != nil {
// Existing job that doesn't have parent job set and it's being set
if ej.ParentJob == "" && job.ParentJob != "" {
pj, err := job.GetParent()
if err != nil {
return err
}

// Set the depencency tree for a job given the job and the previous version
// of the Job or nil if it's new.
func (s *Store) SetJobDependencyTree(job *Job, previousJob *Job) error {
// Existing job that doesn't have parent job set and it's being set
if previousJob != nil && previousJob.ParentJob == "" && job.ParentJob != "" {
pj, err := job.GetParent()
if err != nil {
return err
pj.DependentJobs = append(pj.DependentJobs, job.Name)
if err := s.SetJob(pj, false); err != nil {
return err
}
}
pj.Lock()
defer pj.Unlock()

pj.DependentJobs = append(pj.DependentJobs, job.Name)
if err := s.SetJob(pj, nil); err != nil {
return err
}
}
// Existing job that has parent job set and it's being removed
if ej.ParentJob != "" && job.ParentJob == "" {
pj, err := ej.GetParent()
if err != nil {
return err
}

// Existing job that has parent job set and it's being removed
if previousJob != nil && previousJob.ParentJob != "" && job.ParentJob == "" {
pj, err := previousJob.GetParent()
if err != nil {
return err
}
pj.Lock()
defer pj.Unlock()

ndx := 0
for i, djn := range pj.DependentJobs {
if djn == job.Name {
ndx = i
break
ndx := 0
for i, djn := range pj.DependentJobs {
if djn == job.Name {
ndx = i
break
}
}
pj.DependentJobs = append(pj.DependentJobs[:ndx], pj.DependentJobs[ndx+1:]...)
if err := s.SetJob(pj, false); err != nil {
return err
}
}
pj.DependentJobs = append(pj.DependentJobs[:ndx], pj.DependentJobs[ndx+1:]...)
if err := s.SetJob(pj, nil); err != nil {
return err
}
}

// New job that has parent job set
if previousJob == nil && job.ParentJob != "" {
if ej == nil && job.ParentJob != "" {
pj, err := job.GetParent()
if err != nil {
return err
}
pj.Lock()
defer pj.Unlock()

pj.DependentJobs = append(pj.DependentJobs, job.Name)
if err := s.SetJob(pj, nil); err != nil {
if err := s.SetJob(pj, false); err != nil {
return err
}
}

return nil
}

func (s *Store) AtomicJobPut(job *Job, prevJobKVPair *store.KVPair) (bool, error) {
jobKey := fmt.Sprintf("%s/jobs/%s", s.keyspace, job.Name)
jobJSON, _ := json.Marshal(job)

ok, _, err := s.Client.AtomicPut(jobKey, jobJSON, prevJobKVPair, nil)

return ok, err
}

func (s *Store) validateJob(job *Job) error {
if job.ParentJob == job.Name {
return ErrSameParent
Expand Down
2 changes: 1 addition & 1 deletion dkron/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestStore(t *testing.T) {
}
assert.NotNil(t, jobs, "jobs nil, expecting empty slice")

if err := s.SetJob(testJob, nil); err != nil {
if err := s.SetJob(testJob, true); err != nil {
t.Fatalf("error creating job: %s", err)
}

Expand Down

0 comments on commit 88c072c

Please sign in to comment.