Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add lock update mechanism for jobs, closes #762 #786

Open
wants to merge 1 commit into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type executor struct {
jobsOutCompleted chan uuid.UUID
// used to request jobs from the scheduler
jobOutRequest chan jobOutRequest
// used to request jobs from the scheduler
jobOutUpdateLockRequest chan jobOutUpdateLockRequest

// used by the executor to receive a stop signal from the scheduler
stopCh chan struct{}
Expand Down Expand Up @@ -378,7 +380,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
e.incrementJobCounter(j, Skip)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{
id: j.id,
lock: lock,
}

defer func() {
_ = lock.Unlock(j.ctx)
}()
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
Expand All @@ -387,7 +396,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
e.incrementJobCounter(j, Skip)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{
id: j.id,
lock: lock,
}

defer func() {
_ = lock.Unlock(j.ctx)
}()
}
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)

Expand Down
8 changes: 8 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type internalJob struct {
nextScheduled []time.Time

lastRun time.Time
lastLock Lock
function any
parameters []any
timer clockwork.Timer
Expand Down Expand Up @@ -1026,6 +1027,7 @@ type Job interface {
RunNow() error
// Tags returns the job's string tags.
Tags() []string
Lock() Lock
}

var _ Job = (*job)(nil)
Expand Down Expand Up @@ -1126,3 +1128,9 @@ func (j job) RunNow() error {
}
return err
}

func (j job) Lock() Lock {
ij := requestJob(j.id, j.jobOutRequest)

return ij.lastLock
}
26 changes: 21 additions & 5 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ type jobOutRequest struct {
outChan chan internalJob
}

type jobOutUpdateLockRequest struct {
id uuid.UUID
lock Lock
}

type runJobRequest struct {
id uuid.UUID
outChan chan error
Expand All @@ -131,11 +136,12 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
logger: &noOpLogger{},
clock: clockwork.NewRealClock(),

jobsIn: make(chan jobIn),
jobsOutForRescheduling: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error),
jobsIn: make(chan jobIn),
jobsOutForRescheduling: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
jobOutUpdateLockRequest: make(chan jobOutUpdateLockRequest),
done: make(chan error),
}

s := &scheduler{
Expand Down Expand Up @@ -190,6 +196,9 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
case out := <-s.jobOutRequestCh:
s.selectJobOutRequest(out)

case out := <-s.exec.jobOutUpdateLockRequest:
s.jobOutUpdateLockRequest(out)

case out := <-s.allJobsOutRequest:
s.selectAllJobsOutRequest(out)

Expand Down Expand Up @@ -434,6 +443,13 @@ func (s *scheduler) selectJobOutRequest(out jobOutRequest) {
close(out.outChan)
}

func (s *scheduler) jobOutUpdateLockRequest(out jobOutUpdateLockRequest) {
if j, ok := s.jobs[out.id]; ok {
j.lastLock = out.lock
s.jobs[out.id] = j
}
}

func (s *scheduler) selectNewJob(in newJobIn) {
j := in.job
if s.started {
Expand Down
41 changes: 41 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2633,3 +2633,44 @@
})
}
}

func TestJob_Lock(t *testing.T) {
locker := &testLocker{
notLocked: make(chan struct{}, 1),
}

s := newTestScheduler(t,
WithDistributedLocker(locker),
)

jobRan := make(chan struct{})
j, err := s.NewJob(
DurationJob(time.Millisecond*100),
NewTask(func() {
time.Sleep(50 * time.Millisecond)
jobRan <- struct{}{}
}),
)
require.NoError(t, err)

s.Start()
defer s.Shutdown()

Check failure on line 2657 in scheduler_test.go

View workflow job for this annotation

GitHub Actions / lint and test (1.21)

Error return value of `s.Shutdown` is not checked (errcheck)

select {
case <-jobRan:
// Job has run
case <-time.After(200 * time.Millisecond):
t.Fatal("Job did not run in time")
}

require.Eventually(t, func() bool {
if locker.jobLocked {

Check failure on line 2667 in scheduler_test.go

View workflow job for this annotation

GitHub Actions / lint and test (1.21)

S1008: should use 'return locker.jobLocked' instead of 'if locker.jobLocked { return true }; return false' (gosimple)
return true
}

return false
}, 200*time.Millisecond, 100*time.Millisecond, "Job should be locked")

lock := j.Lock()
assert.NotNil(t, lock, "Job Lock() should return a non-nil Locker")
}
Loading