Skip to content

Commit

Permalink
after lock error listener (#734)
Browse files Browse the repository at this point in the history
* after job listener

* fixing lint issues

---------

Co-authored-by: lv90no <manuel.doncel.martos@ing.com>
  • Loading branch information
manuelarte and lv90no authored Jun 19, 2024
1 parent b12ca98 commit 8949701
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 0 deletions.
32 changes: 32 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gocron_test

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -10,6 +12,14 @@ import (
"github.com/jonboulle/clockwork"
)

var _ Locker = new(errorLocker)

type errorLocker struct{}

func (e errorLocker) Lock(_ context.Context, _ string) (Lock, error) {
return nil, errors.New("locked")
}

func ExampleAfterJobRuns() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()
Expand Down Expand Up @@ -52,6 +62,28 @@ func ExampleAfterJobRunsWithError() {
)
}

func ExampleAfterLockError() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()

_, _ = s.NewJob(
DurationJob(
time.Second,
),
NewTask(
func() {},
),
WithDistributedJobLocker(&errorLocker{}),
WithEventListeners(
AfterLockError(
func(jobID uuid.UUID, jobName string, err error) {
// do something immediately before the job is run
},
),
),
)
}

func ExampleBeforeJobRuns() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()
Expand Down
2 changes: 2 additions & 0 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
} else if j.locker != nil {
lock, err := j.locker.Lock(j.ctx, j.name)
if err != nil {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
return
Expand All @@ -351,6 +352,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
return
Expand Down
13 changes: 13 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type internalJob struct {
afterJobRuns func(jobID uuid.UUID, jobName string)
beforeJobRuns func(jobID uuid.UUID, jobName string)
afterJobRunsWithError func(jobID uuid.UUID, jobName string, err error)
afterLockError func(jobID uuid.UUID, jobName string, err error)

locker Locker
}
Expand Down Expand Up @@ -639,6 +640,18 @@ func BeforeJobRuns(eventListenerFunc func(jobID uuid.UUID, jobName string)) Even
}
}

// AfterLockError is used to when the distributed locker returns an error and
// then run the provided function.
func AfterLockError(eventListenerFunc func(jobID uuid.UUID, jobName string, err error)) EventListener {
return func(j *internalJob) error {
if eventListenerFunc == nil {
return ErrEventListenerFuncNil
}
j.afterLockError = eventListenerFunc
return nil
}
}

// -----------------------------------------------
// -----------------------------------------------
// ---------------- Job Schedules ----------------
Expand Down
11 changes: 11 additions & 0 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,12 +437,20 @@ func TestWithEventListeners(t *testing.T) {
},
nil,
},
{
"afterLockError",
[]EventListener{
AfterLockError(func(_ uuid.UUID, _ string, _ error) {}),
},
nil,
},
{
"multiple event listeners",
[]EventListener{
AfterJobRuns(func(_ uuid.UUID, _ string) {}),
AfterJobRunsWithError(func(_ uuid.UUID, _ string, _ error) {}),
BeforeJobRuns(func(_ uuid.UUID, _ string) {}),
AfterLockError(func(_ uuid.UUID, _ string, _ error) {}),
},
nil,
},
Expand Down Expand Up @@ -488,6 +496,9 @@ func TestWithEventListeners(t *testing.T) {
if ij.beforeJobRuns != nil {
count++
}
if ij.afterLockError != nil {
count++
}
assert.Equal(t, len(tt.eventListeners), count)
})
}
Expand Down
69 changes: 69 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gocron

import (
"context"
"errors"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -49,6 +50,14 @@ func newTestScheduler(t *testing.T, options ...SchedulerOption) Scheduler {
return s
}

var _ Locker = new(errorLocker)

type errorLocker struct{}

func (e errorLocker) Lock(_ context.Context, _ string) (Lock, error) {
return nil, errors.New("locked")
}

func TestScheduler_OneSecond_NoOptions(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
cronNoOptionsCh := make(chan struct{}, 10)
Expand Down Expand Up @@ -1631,6 +1640,66 @@ func TestScheduler_WithEventListeners(t *testing.T) {
}
}

func TestScheduler_WithLocker_WithEventListeners(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

listenerRunCh := make(chan error, 1)
tests := []struct {
name string
locker Locker
tsk Task
el EventListener
expectRun bool
expectErr error
}{
{
"AfterLockError",
errorLocker{},
NewTask(func() {}),
AfterLockError(func(_ uuid.UUID, _ string, err error) {
listenerRunCh <- nil
}),
true,
nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t)
_, err := s.NewJob(
DurationJob(time.Minute*10),
tt.tsk,
WithStartAt(
WithStartImmediately(),
),
WithDistributedJobLocker(tt.locker),
WithEventListeners(tt.el),
WithLimitedRuns(1),
)
require.NoError(t, err)

s.Start()
if tt.expectRun {
select {
case err = <-listenerRunCh:
assert.ErrorIs(t, err, tt.expectErr)
case <-time.After(time.Second):
t.Fatal("timed out waiting for listener to run")
}
} else {
select {
case <-listenerRunCh:
t.Fatal("listener ran when it shouldn't have")
case <-time.After(time.Millisecond * 100):
}
}

require.NoError(t, s.Shutdown())
})
}
}

func TestScheduler_ManyJobs(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

Expand Down

0 comments on commit 8949701

Please sign in to comment.