Skip to content

Commit

Permalink
fix: more efficient ProcessConcurrently func
Browse files Browse the repository at this point in the history
added full test coverage and example for ProcessConcurrently
  • Loading branch information
marksalpeter committed May 2, 2021
1 parent 05fd2dc commit 1dcf5b3
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 65 deletions.
22 changes: 22 additions & 0 deletions example/processors/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package processors

import (
"context"
"errors"
"log"
"time"
)

// Miltiplier is a simple processor that multiplies each integer it receives by some Factor
Expand Down Expand Up @@ -38,3 +40,23 @@ func (m *BatchMultiplier) Process(_ context.Context, ins interface{}) (interface
func (m *BatchMultiplier) Cancel(i interface{}, err error) {
log.Printf("error: could not multiply %+v, %s\n", i, err)
}

// Waiter is a Processor that waits for Duration before returning its output
type Waiter struct {
Duration time.Duration
}

// Process waits for `Waiter.Duration` before returning the value passed in
func (w *Waiter) Process(ctx context.Context, in interface{}) (interface{}, error) {
select {
case <-time.After(w.Duration):
return in, nil
case <-ctx.Done():
return nil, errors.New("process was canceled")
}
}

// Cancel is called when the context is canceled
func (w *Waiter) Cancel(i interface{}, err error) {
log.Printf("error: could not process %+v, %s\n", i, err)
}
59 changes: 59 additions & 0 deletions mocks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package pipeline

import (
"context"
"fmt"
"time"
)

// mockProcess is a mock of the Processor interface
type mockProcessor struct {
processDuration time.Duration
cancelDuration time.Duration
processReturnsErrs bool
processed []interface{}
canceled []interface{}
errs []interface{}
}

// Process waits processDuration before returning its input at its output
func (m *mockProcessor) Process(ctx context.Context, i interface{}) (interface{}, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(m.processDuration):
break
}
if m.processReturnsErrs {
return nil, fmt.Errorf("process error: %d", i)
}
m.processed = append(m.processed, i)
return i, nil
}

// Cancel collects all inputs that were canceled in m.canceled
func (m *mockProcessor) Cancel(i interface{}, err error) {
time.Sleep(m.cancelDuration)
m.canceled = append(m.canceled, i)
m.errs = append(m.errs, err.Error())
}

// containsAll returns true if a and b contain all of the same elements
// in any order or if both are empty / nil
func containsAll(a, b []interface{}) bool {
if len(a) != len(b) {
return false
} else if len(a) == 0 {
return true
}
aMap := make(map[interface{}]bool)
for _, i := range a {
aMap[i] = true
}
for _, i := range b {
if !aMap[i] {
return false
}
}
return true
}
69 changes: 51 additions & 18 deletions process.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package pipeline

import "context"
import (
"context"
"sync"
)

// Process takes each input from the `in <-chan interface{}` and calls `Processor.Process` on it.
// When `Processor.Process` returns an `interface{}`, it will be sent to the output `<-chan interface{}`.
Expand All @@ -9,23 +12,53 @@ import "context"
func Process(ctx context.Context, processor Processor, in <-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
// Start processing inputs until in closes
for i := range in {
select {
// When the context is canceled, Cancel all inputs
case <-ctx.Done():
processor.Cancel(i, ctx.Err())
// Otherwise, Process all inputs
default:
result, err := processor.Process(ctx, i)
if err != nil {
processor.Cancel(i, err)
continue
}
out <- result
}
}
process(ctx, processor, in, out)
close(out)
}()
return out
}

// ProcessConcurrently fans the in channel out to multiple Processors running concurrently,
// then it fans the out channels of the Processors back into a single out chan
func ProcessConcurrently(ctx context.Context, concurrently int, p Processor, in <-chan interface{}) <-chan interface{} {
// Create the out chan
out := make(chan interface{})
// Close the out chan after all of the Processors finish executing
var wg sync.WaitGroup
wg.Add(concurrently)
go func() {
wg.Wait()
close(out)
}()
// Perform Process concurrently times
for i := 0; i < concurrently; i++ {
go func() {
process(ctx, p, in, out)
wg.Done()
}()
}
return out
}

func process(
ctx context.Context,
processor Processor,
in <-chan interface{},
out chan<- interface{},
) {
for i := range in {
select {
// When the context is canceled, Cancel all inputs
case <-ctx.Done():
processor.Cancel(i, ctx.Err())
// Otherwise, Process all inputs
default:
result, err := processor.Process(ctx, i)
if err != nil {
processor.Cancel(i, err)
continue
}
out <- result
}
}
}
10 changes: 0 additions & 10 deletions process_concurrently.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,6 @@ import (
"time"
)

// ProcessConcurrently fans the in channel out to multiple Processors running concurrently,
// then it fans the out channels of the Processors back into a single out chan
func ProcessConcurrently(ctx context.Context, concurrently int, p Processor, in <-chan interface{}) <-chan interface{} {
var outs []<-chan interface{}
for i := 0; i < concurrently; i++ {
outs = append(outs, Process(ctx, p, in))
}
return Merge(outs...)
}

// ProcessBatchConcurrently fans the in channel out to multiple batch Processors running concurrently,
// then it fans the out channels of the batch Processors back into a single out chan
func ProcessBatchConcurrently(
Expand Down
29 changes: 29 additions & 0 deletions process_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,32 @@ func ExampleProcess() {
// result: 50
// error: could not multiply 6, context deadline exceeded
}

func ExampleProcessConcurrently() {
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-7
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7)

// Wait 2 seconds to pass each number through the pipe
// * 2 concurrent Processors
p = pipeline.ProcessConcurrently(ctx, 2, &processors.Waiter{
Duration: 2 * time.Second,
}, p)

// Finally, lets print the results and see what happened
for result := range p {
log.Printf("result: %d\n", result)
}

// Output
// result: 2
// result: 1
// result: 4
// result: 3
// error: could not process 6, process was canceled
// error: could not process 5, process was canceled
// error: could not process 7, context deadline exceeded
}
Loading

0 comments on commit 1dcf5b3

Please sign in to comment.