Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Refactored queuedJob to use promise abstraction.
Browse files Browse the repository at this point in the history
  • Loading branch information
ConnorDoyle committed Dec 22, 2015
1 parent 8ba0f97 commit 869c954
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 103 deletions.
51 changes: 9 additions & 42 deletions scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"bytes"
"encoding/gob"
"fmt"
"sync"
"time"

log "github.com/Sirupsen/logrus"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/cdata"
"github.com/intelsdi-x/snap/core/ctypes"
. "github.com/intelsdi-x/snap/pkg/promise"
)

const (
Expand All @@ -51,25 +51,18 @@ const (
// Await) are idempotent and thread-safe.
type queuedJob interface {
Job() job
IsComplete() bool
Complete()
Await() []error
AndThen(f func(queuedJob))
Promise() Promise
}

type qj struct {
sync.Mutex

job job
complete bool
completeChan chan struct{}
job job
promise Promise
}

func newQueuedJob(job job) queuedJob {
return &qj{
job: job,
complete: false,
completeChan: make(chan struct{}),
job: job,
promise: NewPromise(),
}
}

Expand All @@ -78,35 +71,9 @@ func (j *qj) Job() job {
return j.job
}

// Returns whether this job is complete yet, without blocking.
func (j *qj) IsComplete() bool {
return j.complete
}

// This function unblocks everyone waiting for job completion.
func (j *qj) Complete() {
j.Lock()
defer j.Unlock()

if !j.complete {
j.complete = true
close(j.completeChan)
}
}

// This function BLOCKS the caller until the job is
// marked complete.
func (j *qj) Await() []error {
<-j.completeChan
return j.Job().Errors()
}

// Invokes the supplied function after the job completes.
func (j *qj) AndThen(f func(queuedJob)) {
go func() {
j.Await()
f(j)
}()
// Returns the underlying promise.
func (j *qj) Promise() Promise {
return j.promise
}

// Primary type for job inside
Expand Down
53 changes: 3 additions & 50 deletions scheduler/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ limitations under the License.
package scheduler

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -95,57 +94,11 @@ func TestQueuedJob(t *testing.T) {
So(qj.Job(), ShouldEqual, cj)
})
})
Convey("IsComplete()", t, func() {
Convey("it should return the completion status", func() {
Convey("Promise()", t, func() {
Convey("it should return the underlying promise", func() {
cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt)
qj := newQueuedJob(cj)
So(qj.IsComplete(), ShouldBeFalse)
qj.Complete()
So(qj.IsComplete(), ShouldBeTrue)
})
})
Convey("Complete()", t, func() {
Convey("it should unblock any waiting goroutines", func() {
cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt)
qj := newQueuedJob(cj)

numWaiters := 3
var wg sync.WaitGroup
wg.Add(numWaiters)

for i := 0; i < numWaiters; i++ {
go func() {
qj.Await()
wg.Done()
}()
}

qj.Complete()
wg.Wait()
})
})
Convey("AndThen()", t, func() {
Convey("it should defer the supplied closure until after completion", func() {
cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt)
qj := newQueuedJob(cj)

funcRan := false
c := make(chan struct{})

qj.AndThen(func(queuedJob) {
funcRan = true
close(c)
})

// The callback should not have been executed yet.
So(funcRan, ShouldBeFalse)

// Trigger callback execution by completing the queued job.
qj.Complete()

// Wait for the deferred function to be executed.
<-c
So(funcRan, ShouldBeTrue)
So(qj.Promise().IsComplete(), ShouldBeFalse)
})
})
}
2 changes: 1 addition & 1 deletion scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (q *queue) start() {
Job: e.Job(),
}
q.Err <- qe
e.Complete() // Signal job termination.
e.Promise().Complete([]error{qe}) // Signal job termination.
continue
}

Expand Down
8 changes: 4 additions & 4 deletions scheduler/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func TestQueue(t *testing.T) {
x := 0
q := newQueue(5, func(j queuedJob) {
x = 1
j.Complete()
j.Promise().Complete([]error{})
})
q.Start()
j := &collectorJob{coreJob: &coreJob{}}
qj := newQueuedJob(j)
q.Event <- qj
qj.Await()
qj.Promise().Await()
So(x, ShouldEqual, 1)
q.Stop()
})
Expand All @@ -54,7 +54,7 @@ func TestQueue(t *testing.T) {
x := []time.Time{}
q := newQueue(5, func(j queuedJob) {
x = append(x, j.Job().Deadline())
j.Complete()
j.Promise().Complete([]error{})
})
q.Start()

Expand All @@ -66,7 +66,7 @@ func TestQueue(t *testing.T) {
j := &collectorJob{coreJob: &coreJob{}}
j.deadline = time.Now().Add(time.Duration(i) * time.Second)
qj := newQueuedJob(j)
qj.AndThen(func(queuedJob) { wg.Done() })
qj.Promise().AndThen(func(errors []error) { wg.Done() })
q.Event <- qj
}

Expand Down
2 changes: 1 addition & 1 deletion scheduler/work_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestWorkerManager(t *testing.T) {

// Wait for all queued jobs to be marked complete.
for _, qj := range qjs {
qj.Await()
qj.Promise().Await()
}

// The work queue should be empty at this point.
Expand Down
2 changes: 1 addition & 1 deletion scheduler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (w *worker) start() {
// mark the job complete for one of two reasons:
// - this job was just run
// - the deadline was exceeded and this job will not run
q.Complete()
q.Promise().Complete(q.Job().Errors())

// the single kill-channel -- used when resizing worker pools
case <-w.kamikaze:
Expand Down
2 changes: 1 addition & 1 deletion scheduler/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestWorker(t *testing.T) {
chrono.Chrono.Forward(1500 * time.Millisecond)
qj := newQueuedJob(mj)
rcv <- qj
qj.Await()
qj.Promise().Await()
So(mj.worked, ShouldEqual, false)
})
Convey("stops the worker if kamikaze chan is closed", t, func() {
Expand Down
6 changes: 3 additions & 3 deletions scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (s *schedulerWorkflow) Start(t *task) {

// dispatch 'collect' job to be worked
// Block until the job has been either run or skipped.
errors := t.manager.Work(j).Await()
errors := t.manager.Work(j).Promise().Await()

if len(errors) != 0 {
t.failedRuns++
Expand Down Expand Up @@ -337,7 +337,7 @@ func (s *schedulerWorkflow) StateString() string {
func (s *schedulerWorkflow) workJobs(prs []*processNode, pus []*publishNode, t *task, pj job) {
for _, pr := range prs {
j := newProcessJob(pj, pr.Name(), pr.Version(), pr.InboundContentType, pr.config.Table(), t.metricsManager)
errors := t.manager.Work(j).Await()
errors := t.manager.Work(j).Promise().Await()
if len(errors) != 0 {
t.failedRuns++
t.lastFailureTime = t.lastFireTime
Expand All @@ -349,7 +349,7 @@ func (s *schedulerWorkflow) workJobs(prs []*processNode, pus []*publishNode, t *
}
for _, pu := range pus {
j := newPublishJob(pj, pu.Name(), pu.Version(), pu.InboundContentType, pu.config.Table(), t.metricsManager)
errors := t.manager.Work(j).Await()
errors := t.manager.Work(j).Promise().Await()
if len(errors) != 0 {
t.failedRuns++
t.lastFailureTime = t.lastFireTime
Expand Down

0 comments on commit 869c954

Please sign in to comment.