Skip to content

Commit

Permalink
Make Collector naming more ergonomic.
Browse files Browse the repository at this point in the history
I've gone back and forth on the terminology, but am settling on:

  Run:    func() or func() T          -- was NoError
  Call:   func() (T, error)           -- was Task
  Report: func(report func(T)) error  -- unchanged

This use of "Run" aligns with the top-level Run function, as well as the Run
method of the Group, which take no error.

This use of "Call" aligns with the top-level "Call" function, which wants a
value and an error.

Keep the existing method names as aliases, but mark them deprecated.

Also rename "NewCollector" as "Collect", which besides being shorter is also
more evocative of what it's doing. The construction is not the important aspect
of the collector, but what it provides.
  • Loading branch information
creachadair committed Sep 7, 2024
1 parent 46f2412 commit 2248bec
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 26 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,25 +272,25 @@ var sum int
c := taskgroup.NewCollector(func(v int) { sum += v })
```

The `Task`, `NoError`, and `Report` methods of `c` wrap a function that yields
a value into a task. If the function reports an error, that error is returned
The `Call`, `Run`, and `Report` methods of `c` wrap a function that yields a
value into a task. If the function reports an error, that error is returned
from the task as usual. Otherwise, its non-error value is given to the
accumulator callback. As in the above example, calls to the function are
serialized so that it is safe to access state without additional locking:

```go
// Report an error, no value for the collector.
g.Go(c.Task(func() (int, error) {
g.Go(c.Call(func() (int, error) {
return -1, errors.New("bad")
}))

// Report the value 25 to the collector.
g.Go(c.Task(func() (int, error) {
g.Go(c.Call(func() (int, error) {
return 25, nil
}))

// Report a random integer to the collector.
g.Go(c.NoError(func() int { return rand.Intn(1000) })
g.Go(c.Run(func() int { return rand.Intn(1000) })

// Report multiple values to the collector.
g.Go(c.Report(func(report func(int)) error {
Expand Down
33 changes: 24 additions & 9 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,28 @@ func (c *Collector[T]) report(v T) {
c.handle(v)
}

// NewCollector creates a new collector that delivers task values to the
// specified accumulator function. The collector serializes calls to value, so
// that it is safe for the function to access shared state without a lock.
// NewCollector is an alias for [Collect].
//
// Deprecated: Use Collect instead.
func NewCollector[T any](value func(T)) *Collector[T] { return Collect(value) }

// Collect creates a new collector that delivers task values to the specified
// accumulator function. The collector serializes calls to value, so that it is
// safe for the function to access shared state without a lock.
//
// The tasks created from a collector do not return until all the values
// reported by the underlying function have been processed by the accumulator.
func NewCollector[T any](value func(T)) *Collector[T] { return &Collector[T]{handle: value} }
func Collect[T any](value func(T)) *Collector[T] { return &Collector[T]{handle: value} }

// Task is an alias for Call.
//
// Deprecated: Use Call instead.
func (c *Collector[T]) Task(f func() (T, error)) Task { return c.Call(f) }

// Task returns a Task wrapping a call to f. If f reports an error, that error
// Call returns a Task wrapping a call to f. If f reports an error, that error
// is propagated as the return value of the task; otherwise, the non-error
// value reported by f is passed to the value callback.
func (c *Collector[T]) Task(f func() (T, error)) Task {
func (c *Collector[T]) Call(f func() (T, error)) Task {
return func() error {
v, err := f()
if err != nil {
Expand All @@ -45,8 +55,13 @@ func (c *Collector[T]) Report(f func(report func(T)) error) Task {
return func() error { return f(c.report) }
}

// NoError returns a Task wrapping a call to f. The resulting task reports a
// nil error for all calls.
func (c *Collector[T]) NoError(f func() T) Task {
// NoError is an alias for Run.
//
// Deprecated: Use Run instead.
func (c *Collector[T]) NoError(f func() T) Task { return c.Run(f) }

// Run returns a Task wrapping a call to f. The resulting task reports a nil
// error for all calls.
func (c *Collector[T]) Run(f func() T) Task {
return NoError(func() { c.report(f()) })
}
4 changes: 2 additions & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func ExampleSingle() {

func ExampleCollector() {
var total int
c := taskgroup.NewCollector(func(v int) {
c := taskgroup.Collect(func(v int) {
total += v
})

Expand Down Expand Up @@ -190,7 +190,7 @@ func ExampleCollector_Report() {
who string
v int
}
c := taskgroup.NewCollector(func(z val) { fmt.Println(z.who, z.v) })
c := taskgroup.Collect(func(z val) { fmt.Println(z.who, z.v) })

err := taskgroup.New(nil).
// The Report method passes its argument a function to report multiple
Expand Down
20 changes: 10 additions & 10 deletions taskgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,11 @@ func TestSingleTask(t *testing.T) {
return <-release
})

g := taskgroup.New(nil).Go(taskgroup.NoError(func() {
g := taskgroup.New(nil).Run(func() {
if err := s.Wait(); err != sentinel {
t.Errorf("Background Wait: got %v, want %v", err, sentinel)
}
}))
})

release <- sentinel
if err := s.Wait(); err != sentinel {
Expand All @@ -212,7 +212,7 @@ func TestWaitMoreTasks(t *testing.T) {
defer leaktest.Check(t)()

var results int
coll := taskgroup.NewCollector(func(int) {
coll := taskgroup.Collect(func(int) {
results++
})

Expand All @@ -226,14 +226,14 @@ func TestWaitMoreTasks(t *testing.T) {
if n > 1 {
// The subordinate task, if there is one, is started before this one
// exits, ensuring the group is kept "afloat".
g.Go(coll.NoError(func() int {
g.Go(coll.Run(func() int {
return countdown(n - 1)
}))
}
return n
}

g.Go(coll.NoError(func() int { return countdown(15) }))
g.Go(coll.Run(func() int { return countdown(15) }))
g.Wait()

if results != 15 {
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestSingleResult(t *testing.T) {

func TestCollector(t *testing.T) {
var sum int
c := taskgroup.NewCollector(func(v int) { sum += v })
c := taskgroup.Collect(func(v int) { sum += v })

vs := rand.Perm(15)
g := taskgroup.New(nil)
Expand All @@ -272,15 +272,15 @@ func TestCollector(t *testing.T) {
v := v
if v > 10 {
// This value should not be accumulated.
g.Go(c.Task(func() (int, error) {
g.Go(c.Call(func() (int, error) {
return -100, errors.New("don't add this")
}))
} else if i%2 == 0 {
// A function with an error.
g.Go(c.Task(func() (int, error) { return v, nil }))
g.Go(c.Call(func() (int, error) { return v, nil }))
} else {
// A function without an error.
g.Go(c.NoError(func() int { return v }))
g.Go(c.Run(func() int { return v }))
}
}
g.Wait() // wait for tasks to finish
Expand All @@ -292,7 +292,7 @@ func TestCollector(t *testing.T) {

func TestCollector_Report(t *testing.T) {
var sum int
c := taskgroup.NewCollector(func(v int) { sum += v })
c := taskgroup.Collect(func(v int) { sum += v })

g := taskgroup.New(nil).Go(c.Report(func(report func(v int)) error {
for _, v := range rand.Perm(10) {
Expand Down

0 comments on commit 2248bec

Please sign in to comment.