Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queue): handle errors in ScheduledJobs and Size methods #125

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions examples/queue/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ func main() {
}, jobQueue)
scheduler.Start(ctx)

if jobQueue.Size() == 0 {
jobQueueSize, err := jobQueue.Size()
if err != nil {
logger.Errorf("Failed to fetch job queue size: %s", err)
return
}

if jobQueueSize == 0 {
logger.Info("Scheduling new jobs")
jobDetail1 := quartz.NewJobDetail(&printJob{5}, quartz.NewJobKey("job1"))
if err := scheduler.ScheduleJob(jobDetail1, quartz.NewSimpleTrigger(5*time.Second)); err != nil {
Expand All @@ -58,7 +64,11 @@ func main() {

<-ctx.Done()

scheduledJobs := jobQueue.ScheduledJobs(nil)
scheduledJobs, err := jobQueue.ScheduledJobs(nil)
if err != nil {
logger.Errorf("Failed to fetch scheduled jobs: %s", err)
return
}
jobNames := make([]string, 0, len(scheduledJobs))
for _, job := range scheduledJobs {
jobNames = append(jobNames, job.JobDetail().JobKey().String())
Expand Down Expand Up @@ -289,14 +299,16 @@ func (jq *jobQueue) Remove(jobKey *quartz.JobKey) (quartz.ScheduledJob, error) {
}

// ScheduledJobs returns the slice of all scheduled jobs in the queue.
func (jq *jobQueue) ScheduledJobs(matchers []quartz.Matcher[quartz.ScheduledJob]) []quartz.ScheduledJob {
func (jq *jobQueue) ScheduledJobs(
matchers []quartz.Matcher[quartz.ScheduledJob],
) ([]quartz.ScheduledJob, error) {
jq.mtx.Lock()
defer jq.mtx.Unlock()
logger.Trace("ScheduledJobs")
var jobs []quartz.ScheduledJob
fileInfo, err := os.ReadDir(dataFolder)
if err != nil {
return jobs
return nil, err
}
for _, file := range fileInfo {
if !file.IsDir() {
Expand All @@ -309,7 +321,7 @@ func (jq *jobQueue) ScheduledJobs(matchers []quartz.Matcher[quartz.ScheduledJob]
}
}
}
return jobs
return jobs, nil
}

func isMatch(job quartz.ScheduledJob, matchers []quartz.Matcher[quartz.ScheduledJob]) bool {
Expand All @@ -323,12 +335,15 @@ func isMatch(job quartz.ScheduledJob, matchers []quartz.Matcher[quartz.Scheduled
}

// Size returns the size of the job queue.
func (jq *jobQueue) Size() int {
func (jq *jobQueue) Size() (int, error) {
jq.mtx.Lock()
defer jq.mtx.Unlock()
logger.Trace("Size")
files, _ := os.ReadDir(dataFolder)
return len(files)
files, err := os.ReadDir(dataFolder)
if err != nil {
return 0, err
}
return len(files), nil
}

// Clear clears the job queue.
Expand Down
16 changes: 9 additions & 7 deletions quartz/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ type JobQueue interface {
Push(job ScheduledJob) error

// Pop removes and returns the next to run scheduled job from the queue.
// Implementations should return quartz.ErrQueueEmpty if the queue is empty.
Pop() (ScheduledJob, error)

// Head returns the first scheduled job without removing it from the queue.
// Implementations should return quartz.ErrQueueEmpty if the queue is empty.
Head() (ScheduledJob, error)

// Get returns the scheduled job with the specified key without removing it
Expand All @@ -75,10 +77,10 @@ type JobQueue interface {
// // ... WHERE group_name = m.Pattern
// }
// }
ScheduledJobs([]Matcher[ScheduledJob]) []ScheduledJob
ScheduledJobs([]Matcher[ScheduledJob]) ([]ScheduledJob, error)

// Size returns the size of the job queue.
Size() int
Size() (int, error)

// Clear clears the job queue.
Clear() error
Expand Down Expand Up @@ -212,11 +214,11 @@ func (jq *jobQueue) Remove(jobKey *JobKey) (ScheduledJob, error) {
// ScheduledJobs returns a slice of scheduled jobs in the queue.
// For a job to be returned, it must satisfy all of the specified matchers.
// Given an empty matchers it returns all scheduled jobs.
func (jq *jobQueue) ScheduledJobs(matchers []Matcher[ScheduledJob]) []ScheduledJob {
func (jq *jobQueue) ScheduledJobs(matchers []Matcher[ScheduledJob]) ([]ScheduledJob, error) {
jq.mtx.Lock()
defer jq.mtx.Unlock()
if len(matchers) == 0 {
return jq.scheduledJobs()
return jq.scheduledJobs(), nil
}
matchedJobs := make([]ScheduledJob, 0)
JobLoop:
Expand All @@ -229,7 +231,7 @@ JobLoop:
}
matchedJobs = append(matchedJobs, job)
}
return matchedJobs
return matchedJobs, nil
}

// scheduledJobs returns all scheduled jobs.
Expand All @@ -242,10 +244,10 @@ func (jq *jobQueue) scheduledJobs() []ScheduledJob {
}

// Size returns the size of the job queue.
func (jq *jobQueue) Size() int {
func (jq *jobQueue) Size() (int, error) {
jq.mtx.Lock()
defer jq.mtx.Unlock()
return len(jq.delegate)
return len(jq.delegate), nil
}

// Clear clears the job queue.
Expand Down
50 changes: 26 additions & 24 deletions quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) []*JobK
sched.mtx.Lock()
defer sched.mtx.Unlock()

scheduledJobs := sched.queue.ScheduledJobs(matchers)
scheduledJobs, _ := sched.queue.ScheduledJobs(matchers)
keys := make([]*JobKey, 0, len(scheduledJobs))
for _, scheduled := range scheduledJobs {
keys = append(keys, scheduled.JobDetail().jobKey)
Expand Down Expand Up @@ -382,31 +382,33 @@ func (sched *StdScheduler) Stop() {

func (sched *StdScheduler) startExecutionLoop(ctx context.Context) {
defer sched.wg.Done()
maxTimerDuration := time.Duration(1<<63 - 1)
timer := time.NewTimer(maxTimerDuration)
for {
if sched.queue.Size() == 0 {
select {
case <-sched.interrupt:
logger.Trace("Interrupted in empty queue.")
case <-ctx.Done():
logger.Info("Exit the empty execution loop.")
return
}
} else {
timer := time.NewTimer(sched.calculateNextTick())
select {
case <-timer.C:
logger.Trace("Tick.")
sched.executeAndReschedule(ctx)
queueSize, err := sched.queue.Size()
switch {
case err != nil:
logger.Errorf("Failed to fetch queue size: %s", err)
timer.Reset(sched.opts.RetryInterval)
case queueSize == 0:
logger.Trace("Queue is empty.")
timer.Reset(maxTimerDuration)
default:
timer.Reset(sched.calculateNextTick())
}
select {
case <-timer.C:
logger.Trace("Tick.")
sched.executeAndReschedule(ctx)

case <-sched.interrupt:
logger.Trace("Interrupted waiting for next tick.")
timer.Stop()
case <-sched.interrupt:
logger.Trace("Interrupted waiting for next tick.")
timer.Stop()

case <-ctx.Done():
logger.Info("Exit the execution loop.")
timer.Stop()
return
}
case <-ctx.Done():
logger.Info("Exit the execution loop.")
timer.Stop()
return
}
}
}
Expand Down Expand Up @@ -439,7 +441,7 @@ func (sched *StdScheduler) calculateNextTick() time.Duration {
logger.Debug("Queue is empty")
return nextTickDuration
}
logger.Warnf("Failed to calculate next tick: %s", err)
logger.Errorf("Failed to calculate next tick: %s", err)
return sched.opts.RetryInterval
}
nextRunTime := scheduledJob.NextRunTime()
Expand Down
Loading