Skip to content

Commit

Permalink
Add option WithoutGoroutines for changing behaviour in tasks spawning
Browse files Browse the repository at this point in the history
  • Loading branch information
mshsmlv committed Feb 25, 2021
1 parent 4567874 commit 465cb83
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
34 changes: 33 additions & 1 deletion group.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Group struct {
ctx context.Context
cancel func()

withoutGoroutines bool

// Task runner and a heap of tasks to be run.
wg sync.WaitGroup
mu sync.Mutex
Expand All @@ -34,9 +36,25 @@ type Group struct {
lenC chan int
}

// Option allows to tune Group behaviour on creation.
type Option func(*Group)

// WithoutGoroutines forces Group to handle tasks in the main scheduler thread
// (whereas by default task will be spawned in a separate goroutine). Useful when
// you need to schedule many small tasks somewhere in the future and do not
// want to spawn a goroutine for each of them.
//
// N.B. Hanging task will block Group from stopping or processing further tasks,
// so it's a user responsibility to deal with long running tasks.
func WithoutGoroutines() Option {
return func(g *Group) {
g.withoutGoroutines = true
}
}

// New creates a new Group which will use ctx for cancelation. If cancelation
// is not a concern, use context.Background().
func New(ctx context.Context) *Group {
func New(ctx context.Context, opts ...Option) *Group {
// Monitor goroutine context and cancelation.
mctx, cancel := context.WithCancel(ctx)

Expand All @@ -50,6 +68,10 @@ func New(ctx context.Context) *Group {
lenC: make(chan int),
}

for _, opt := range opts {
opt(g)
}

g.wg.Add(1)
go func() {
defer g.wg.Done()
Expand Down Expand Up @@ -207,6 +229,10 @@ func (g *Group) trigger(now time.Time) time.Time {
}()

for g.tasks.Len() > 0 {
if g.ctx.Err() != nil {
return time.Time{}
}

next := &g.tasks[0]
if next.Deadline.After(now) {
// Earliest scheduled task is not ready.
Expand All @@ -215,6 +241,12 @@ func (g *Group) trigger(now time.Time) time.Time {

// This task is ready, pop it from the heap and run it.
t := heap.Pop(&g.tasks).(task)

if g.withoutGoroutines {
t.Call()
continue
}

g.wg.Add(1)
go func() {
defer g.wg.Done()
Expand Down
33 changes: 33 additions & 0 deletions group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,39 @@ func TestGroupWaitAfterScheduled(t *testing.T) {
}
}

func TestGroupWithoutGoroutines(t *testing.T) {

// Ensure option WithoutGoroutines() blocks spawning new goroutine for each task.
// The idea of a test is running a blocking task and a usual task. The second one should not start.

ctx, cancel := context.WithCancel(context.Background())
sg := schedgroup.New(ctx, schedgroup.WithoutGoroutines())

// Schedule blocking task in the past to run it immediately.
sg.Schedule(time.Now().Add(-1*time.Second), func() {
select {
case <-ctx.Done():
return
}
})

// Schedule a second task that shouldn't start while the first is working.
sg.Schedule(time.Now().Add(500*time.Millisecond), func() {
t.Fatalf("WithoutGoroutines option is not working")
})

var wg sync.WaitGroup
wg.Add(1)
go func() {
_ = sg.Wait()
wg.Done()
}()

time.Sleep(1 * time.Second) // Wait to consider that the second task hasn't run.
cancel() // Stop Group and the first task.
wg.Wait()
}

// This example demonstrates typical use of a Group.
func ExampleGroup_wait() {
// Create a Group which will not use a context for cancelation.
Expand Down

0 comments on commit 465cb83

Please sign in to comment.