Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option WithoutGoroutines for changing behaviour in tasks spawning #1

Merged
merged 1 commit into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion group.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Group struct {
ctx context.Context
cancel func()

withoutGoroutines bool

// Task runner and a heap of tasks to be run.
wg sync.WaitGroup
mu sync.Mutex
Expand All @@ -34,9 +36,25 @@ type Group struct {
lenC chan int
}

// Option allows to tune Group behaviour on creation.
type Option func(*Group)

// WithoutGoroutines forces Group to handle tasks in the main scheduler thread
// (whereas by default task will be spawned in a separate goroutine). Useful when
// you need to schedule many small tasks somewhere in the future and do not
// want to spawn a goroutine for each of them.
//
// N.B. Hanging task will block Group from stopping or processing further tasks,
// so it's a user responsibility to deal with long running tasks.
func WithoutGoroutines() Option {
return func(g *Group) {
g.withoutGoroutines = true
}
}

// New creates a new Group which will use ctx for cancelation. If cancelation
// is not a concern, use context.Background().
func New(ctx context.Context) *Group {
func New(ctx context.Context, opts ...Option) *Group {
// Monitor goroutine context and cancelation.
mctx, cancel := context.WithCancel(ctx)

Expand All @@ -50,6 +68,10 @@ func New(ctx context.Context) *Group {
lenC: make(chan int),
}

for _, opt := range opts {
opt(g)
}

g.wg.Add(1)
go func() {
defer g.wg.Done()
Expand Down Expand Up @@ -207,6 +229,10 @@ func (g *Group) trigger(now time.Time) time.Time {
}()

for g.tasks.Len() > 0 {
if g.ctx.Err() != nil {
return time.Time{}
}

next := &g.tasks[0]
if next.Deadline.After(now) {
// Earliest scheduled task is not ready.
Expand All @@ -215,6 +241,12 @@ func (g *Group) trigger(now time.Time) time.Time {

// This task is ready, pop it from the heap and run it.
t := heap.Pop(&g.tasks).(task)

if g.withoutGoroutines {
t.Call()
continue
}

g.wg.Add(1)
go func() {
defer g.wg.Done()
Expand Down
33 changes: 33 additions & 0 deletions group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,39 @@ func TestGroupWaitAfterScheduled(t *testing.T) {
}
}

func TestGroupWithoutGoroutines(t *testing.T) {

// Ensure option WithoutGoroutines() blocks spawning new goroutine for each task.
// The idea of a test is running a blocking task and a usual task. The second one should not start.

ctx, cancel := context.WithCancel(context.Background())
sg := schedgroup.New(ctx, schedgroup.WithoutGoroutines())

// Schedule blocking task in the past to run it immediately.
sg.Schedule(time.Now().Add(-1*time.Second), func() {
select {
case <-ctx.Done():
return
}
})

// Schedule a second task that shouldn't start while the first is working.
sg.Schedule(time.Now().Add(500*time.Millisecond), func() {
t.Fatalf("WithoutGoroutines option is not working")
})

var wg sync.WaitGroup
wg.Add(1)
go func() {
_ = sg.Wait()
wg.Done()
}()

time.Sleep(1 * time.Second) // Wait to consider that the second task hasn't run.
cancel() // Stop Group and the first task.
wg.Wait()
}

// This example demonstrates typical use of a Group.
func ExampleGroup_wait() {
// Create a Group which will not use a context for cancelation.
Expand Down