-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathegr.go
66 lines (57 loc) · 2.06 KB
/
egr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package egr
import (
"context"
"golang.org/x/sync/errgroup"
)
// Group[T] is a collection of goroutines processing
// items of type T from a shared queue.
type Group[T any] struct {
group *errgroup.Group
queue chan T
}
// WithContext returns a new Group[T] along with a derived context.Context.
// The group's goroutines will be canceled if any goroutine returns a non-nil error.
func WithContext[T any](ctx context.Context, queueSize int) (*Group[T], context.Context) {
group, ctx := errgroup.WithContext(ctx)
queue := make(chan T, queueSize)
return &Group[T]{group, queue}, ctx
}
// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit. Any subsequent call to the Go method will
// block until it can add an active goroutine without exceeding the configured limit.
// The limit must not be modified while any goroutines in the group are active.
func (g *Group[T]) SetLimit(n int) {
g.group.SetLimit(n)
}
// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
// The return value reports whether the goroutine was started.
func (g *Group[T]) TryGo(f func(queue <-chan T) error) bool {
return g.group.TryGo(func() error {
return f(g.queue)
})
}
// Go runs a function in a new goroutine, passing a read-only channel of type T.
// If any goroutine returns an error, the context is canceled and the error is propagated.
func (g *Group[T]) Go(f func(queue <-chan T) error) {
g.group.Go(func() error {
return f(g.queue)
})
}
// Push sends an item of type T into the queue.
// If the provided ctx is canceled, Push returns the context's error.
// Push must not be called after Wait.
func (g *Group[T]) Push(ctx context.Context, item T) error {
select {
case g.queue <- item:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Wait closes the queue channel and waits for all goroutines to complete,
// returning the first error encountered (if any).
func (g *Group[T]) Wait() error {
close(g.queue)
return g.group.Wait()
}