Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dynamic limit mode update functionality, closes #768 #788

Open
wants to merge 1 commit into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package gocron

import (
"context"
"fmt"
"reflect"
"runtime"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -47,6 +49,10 @@ type Scheduler interface {
// JobsWaitingInQueue number of jobs waiting in Queue in case of LimitModeWait
// In case of LimitModeReschedule or no limit it will be always zero
JobsWaitingInQueue() int
// GetLimitMode returns the current limit mode configuration of the scheduler
GetLimitMode() *limitModeConfig
// UpdateScheduler updates the scheduler with the provided options
UpdateScheduler(options ...SchedulerUpdateOption) error
}

// -----------------------------------------------
Expand Down Expand Up @@ -779,6 +785,77 @@ func (s *scheduler) JobsWaitingInQueue() int {
return 0
}

func (s *scheduler) GetLimitMode() *limitModeConfig {
return s.exec.limitMode
}

// SchedulerUpdateOption defines the function for setting
// update options on the Scheduler.
type SchedulerUpdateOption func(*scheduler) error

func (s *scheduler) UpdateScheduler(options ...SchedulerUpdateOption) error {
stopped := false
// If the scheduler is running, we need to stop it
if s.started {
if err := s.StopJobs(); err != nil {
return err
}
stopped = true
}

for _, option := range options {
if err := option(s); err != nil {
return err
}
}

if stopped {
s.Start()
}

return nil
}

// WithUpdateLimitMode updates the limit mode of the scheduler
func WithUpdateLimitMode(limit uint, mode LimitMode) SchedulerUpdateOption {
return func(s *scheduler) error {
if limit == 0 {
return ErrWithLimitConcurrentJobsZero
}

s.logger.Debug(fmt.Sprintf("Updating limit mode to: %v, limit: %d", mode, limit))

// Create a new limitModeConfig
newLimitMode := &limitModeConfig{
mode: mode,
limit: limit,
in: make(chan jobIn, 1000),
singletonJobs: make(map[uuid.UUID]struct{}),
}

if mode == LimitModeReschedule {
newLimitMode.rescheduleLimiter = make(chan struct{}, limit)
}

var wg sync.WaitGroup

// If there's an existing limitMode, we need to transfer the queued jobs
if s.exec.limitMode != nil {
s.logger.Debug("Transferring jobs from old limit mode to new limit mode")
// ... (rest of the code remains the same)
}

// Wait for all goroutines to complete
wg.Wait()

// Update the scheduler with the new limit mode configuration
s.exec.limitMode = newLimitMode
s.logger.Debug("Limit mode update completed")

return nil
}
}

// -----------------------------------------------
// -----------------------------------------------
// ------------- Scheduler Options ---------------
Expand Down
209 changes: 209 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2633,3 +2633,212 @@ func TestScheduler_WithMonitor(t *testing.T) {
})
}
}

func TestScheduler_GetLimitMode(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

tests := []struct {
name string
limitOpt SchedulerOption
expected *limitModeConfig
}{
{
name: "No limit mode set",
limitOpt: nil,
expected: nil,
},
{
name: "Limit mode set to wait",
limitOpt: WithLimitConcurrentJobs(5, LimitModeWait),
expected: &limitModeConfig{
mode: LimitModeWait,
limit: 5,
},
},
{
name: "Limit mode set to reschedule",
limitOpt: WithLimitConcurrentJobs(10, LimitModeReschedule),
expected: &limitModeConfig{
mode: LimitModeReschedule,
limit: 10,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
opts := []SchedulerOption{
WithLogger(NewLogger(LogLevelDebug)),
WithStopTimeout(time.Second),
}
if tt.limitOpt != nil {
opts = append(opts, tt.limitOpt)
}

s, err := NewScheduler(opts...)
require.NoError(t, err)

limitMode := s.GetLimitMode()
if tt.expected == nil {
assert.Nil(t, limitMode)
} else {
assert.NotNil(t, limitMode)
assert.Equal(t, tt.expected.mode, limitMode.mode)
assert.Equal(t, tt.expected.limit, limitMode.limit)
}

require.NoError(t, s.Shutdown())
})
}
}

func TestScheduler_UpdateScheduler(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

tests := []struct {
name string
initialLimitOpt SchedulerOption
updateOpt SchedulerUpdateOption
expectedMode LimitMode
expectedLimit uint
}{
{
name: "Update from no limit to wait mode",
initialLimitOpt: nil,
updateOpt: WithUpdateLimitMode(5, LimitModeWait),
expectedMode: LimitModeWait,
expectedLimit: 5,
},
{
name: "Update from wait to reschedule mode",
initialLimitOpt: WithLimitConcurrentJobs(3, LimitModeWait),
updateOpt: WithUpdateLimitMode(7, LimitModeReschedule),
expectedMode: LimitModeReschedule,
expectedLimit: 7,
},
{
name: "Update limit without changing mode",
initialLimitOpt: WithLimitConcurrentJobs(5, LimitModeReschedule),
updateOpt: WithUpdateLimitMode(10, LimitModeReschedule),
expectedMode: LimitModeReschedule,
expectedLimit: 10,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
opts := []SchedulerOption{
WithLogger(NewLogger(LogLevelDebug)),
WithStopTimeout(time.Second),
}
if tt.initialLimitOpt != nil {
opts = append(opts, tt.initialLimitOpt)
}

s, err := NewScheduler(opts...)
require.NoError(t, err)

s.Start()

err = s.UpdateScheduler(tt.updateOpt)
require.NoError(t, err)

limitMode := s.GetLimitMode()
assert.NotNil(t, limitMode)
assert.Equal(t, tt.expectedMode, limitMode.mode)
assert.Equal(t, tt.expectedLimit, limitMode.limit)

require.NoError(t, s.Shutdown())
})
}
}

func TestScheduler_UpdateSchedulerWithRunningJobs(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

jobRan := make(chan struct{}, 10)
jobStarted := make(chan struct{}, 10)

logger := NewLogger(LogLevelDebug)
s, err := NewScheduler(
WithLogger(logger),
WithStopTimeout(time.Second),
WithLimitConcurrentJobs(1, LimitModeWait),
)
require.NoError(t, err)

for range []int{1, 2, 3} {
_, err = s.NewJob(
DurationJob(100*time.Millisecond),
NewTask(func() {
jobStarted <- struct{}{}
time.Sleep(200 * time.Millisecond)
jobRan <- struct{}{}
}),
)
require.NoError(t, err)
}

s.Start()

// Wait for the first job to start
select {
case <-jobStarted:
logger.Debug("First job started")
case <-time.After(500 * time.Millisecond):
t.Fatal("Timed out waiting for first job to start")
}

// Update the scheduler while a job is running
err = s.UpdateScheduler(WithUpdateLimitMode(2, LimitModeReschedule))
require.NoError(t, err)

limitMode := s.GetLimitMode()
require.NotNil(t, limitMode)

logger.Debug(fmt.Sprintf("Current limit mode: %v, limit: %d", limitMode.mode, limitMode.limit))

assert.Equal(t, LimitModeReschedule, int(limitMode.mode), "Limit mode should be LimitModeReschedule")
assert.Equal(t, uint(2), limitMode.limit, "Limit should be 2")

// Wait for a few more job runs to ensure the scheduler is still functioning
jobCount := 0
timeout := time.After(2 * time.Second)
for jobCount < 3 {
select {
case <-jobRan:
jobCount++
logger.Debug(fmt.Sprintf("Job completed, count: %d", jobCount))
case <-timeout:
t.Fatalf("Timed out waiting for jobs to run. Only %d jobs completed.", jobCount)
}
}

// Initiate shutdown
logger.Debug("Initiating scheduler shutdown")
shutdownErr := make(chan error, 1)
go func() {
shutdownErr <- s.Shutdown()
}()

// Allow time for in-progress jobs to complete
time.Sleep(300 * time.Millisecond)

// Check for any jobs that might have started just before shutdown
jobsAfterShutdownInitiated := 0
for {
select {
case <-jobStarted:
jobsAfterShutdownInitiated++
logger.Debug(fmt.Sprintf("Job started after shutdown initiated: %d", jobsAfterShutdownInitiated))
case <-jobRan:
logger.Debug("Job completed after shutdown initiated")
case err := <-shutdownErr:
require.NoError(t, err, "Shutdown should complete without error")
logger.Debug("Scheduler shutdown completed")
return
case <-time.After(500 * time.Millisecond):
t.Fatal("Timed out waiting for scheduler to shut down")
}
}
}
Loading