Skip to content

Commit

Permalink
Add goroutine limit to the Group
Browse files Browse the repository at this point in the history
  • Loading branch information
mshsmlv committed Feb 25, 2021
1 parent 4567874 commit 24d7e21
Showing 1 changed file with 24 additions and 1 deletion.
25 changes: 24 additions & 1 deletion group.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Group struct {
ctx context.Context
cancel func()

withoutGoroutines bool

// Task runner and a heap of tasks to be run.
wg sync.WaitGroup
mu sync.Mutex
Expand All @@ -34,9 +36,19 @@ type Group struct {
lenC chan int
}

type GroupOption func(*Group)

// WithoutGoroutines is the option for handling tasks
// which makes Group not spawn a new goroutine for each task.
func WithoutGoroutines() GroupOption {
return func(g *Group) {
g.withoutGoroutines = true
}
}

// New creates a new Group which will use ctx for cancelation. If cancelation
// is not a concern, use context.Background().
func New(ctx context.Context) *Group {
func New(ctx context.Context, opts ...GroupOption) *Group {
// Monitor goroutine context and cancelation.
mctx, cancel := context.WithCancel(ctx)

Expand All @@ -50,6 +62,10 @@ func New(ctx context.Context) *Group {
lenC: make(chan int),
}

for _, opt := range opts {
opt(g)
}

g.wg.Add(1)
go func() {
defer g.wg.Done()
Expand Down Expand Up @@ -210,11 +226,18 @@ func (g *Group) trigger(now time.Time) time.Time {
next := &g.tasks[0]
if next.Deadline.After(now) {
// Earliest scheduled task is not ready.

return next.Deadline
}

// This task is ready, pop it from the heap and run it.
t := heap.Pop(&g.tasks).(task)

if g.withoutGoroutines {
t.Call()
continue
}

g.wg.Add(1)
go func() {
defer g.wg.Done()
Expand Down

0 comments on commit 24d7e21

Please sign in to comment.