Skip to content

Commit

Permalink
fix: more robust handling of panics
Browse files Browse the repository at this point in the history
Switched from `sync.WaitGroup`s to semaphores for more robust panic 'recovery'. Now, if `Process.Process` or `Process.Cancel` panics it wont effect the ammount of concurrent goroutines running in `ProcessConcurrently` and `ProcessBatchConcurrently`.
  • Loading branch information
marksalpeter committed Jun 29, 2021
1 parent fd3384c commit 87f2017
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 78 deletions.
2 changes: 1 addition & 1 deletion mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type mockProcessor struct {
errs []interface{}
}

// Process waits processDuration before returning its input at its output
// Process waits processDuration before returning its input as its output
func (m *mockProcessor) Process(ctx context.Context, i interface{}) (interface{}, error) {
select {
case <-ctx.Done():
Expand Down
55 changes: 28 additions & 27 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package pipeline

import (
"context"
"sync"

"github.com/deliveryhero/pipeline/semaphore"
)

// Process takes each input from the `in <-chan interface{}` and calls `Processor.Process` on it.
Expand All @@ -12,7 +13,9 @@ import (
func Process(ctx context.Context, processor Processor, in <-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
process(ctx, processor, in, out)
for i := range in {
process(ctx, processor, i, out)
}
close(out)
}()
return out
Expand All @@ -23,42 +26,40 @@ func Process(ctx context.Context, processor Processor, in <-chan interface{}) <-
func ProcessConcurrently(ctx context.Context, concurrently int, p Processor, in <-chan interface{}) <-chan interface{} {
// Create the out chan
out := make(chan interface{})
// Close the out chan after all of the Processors finish executing
var wg sync.WaitGroup
wg.Add(concurrently)
go func() {
wg.Wait()
// Perform Process concurrently times
sem := semaphore.New(concurrently)
for i := range in {
sem.Add(1)
go func(i interface{}) {
process(ctx, p, i, out)
sem.Done()
}(i)
}
// Close the out chan after all of the Processors finish executing
sem.Wait()
close(out)
}()
// Perform Process concurrently times
for i := 0; i < concurrently; i++ {
go func() {
process(ctx, p, in, out)
wg.Done()
}()
}
return out
}

func process(
ctx context.Context,
processor Processor,
in <-chan interface{},
i interface{},
out chan<- interface{},
) {
for i := range in {
select {
// When the context is canceled, Cancel all inputs
case <-ctx.Done():
processor.Cancel(i, ctx.Err())
// Otherwise, Process all inputs
default:
result, err := processor.Process(ctx, i)
if err != nil {
processor.Cancel(i, err)
continue
}
out <- result
select {
// When the context is canceled, Cancel all inputs
case <-ctx.Done():
processor.Cancel(i, ctx.Err())
// Otherwise, Process all inputs
default:
result, err := processor.Process(ctx, i)
if err != nil {
processor.Cancel(i, err)
return
}
out <- result
}
}
92 changes: 54 additions & 38 deletions process_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package pipeline

import (
"context"
"sync"
"time"

"github.com/deliveryhero/pipeline/semaphore"
)

// ProcessBatch collects up to maxSize elements over maxDuration and processes them together as a slice of `interface{}`s.
Expand All @@ -19,7 +20,11 @@ func ProcessBatch(
) <-chan interface{} {
out := make(chan interface{})
go func() {
processBatch(ctx, maxSize, maxDuration, processor, in, out)
for {
if !processOneBatch(ctx, maxSize, maxDuration, processor, in, out) {
break
}
}
close(out)
}()
return out
Expand All @@ -37,55 +42,66 @@ func ProcessBatchConcurrently(
) <-chan interface{} {
// Create the out chan
out := make(chan interface{})
// Close the out chan after all of the Processors finish executing
var wg sync.WaitGroup
wg.Add(concurrently)
go func() {
wg.Wait()
// Perform Process concurrently times
sem := semaphore.New(concurrently)
lctx, done := context.WithCancel(context.Background())
for !isDone(lctx) {
sem.Add(1)
go func() {
if !processOneBatch(ctx, maxSize, maxDuration, processor, in, out) {
done()
}
sem.Done()
}()
}
// Close the out chan after all of the Processors finish executing
sem.Wait()
close(out)
done() // Satisfy go-vet
}()
// Perform Process concurrently times
for i := 0; i < concurrently; i++ {
go func() {
processBatch(ctx, maxSize, maxDuration, processor, in, out)
wg.Done()
}()
}
return out
}

func processBatch(
// isDone returns true if the context is canceled
func isDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

// processOneBatch processes one batch of inputs from the in chan.
// It returns true if the in chan is still open.
func processOneBatch(
ctx context.Context,
maxSize int,
maxDuration time.Duration,
processor Processor,
in <-chan interface{},
out chan<- interface{},
) {
for {
// Collect interfaces for batch processing
is, open := collect(ctx, maxSize, maxDuration, in)
if is != nil {
select {
// Cancel all inputs during shutdown
case <-ctx.Done():
processor.Cancel(is, ctx.Err())
// Otherwise Process the inputs
default:
results, err := processor.Process(ctx, is)
if err != nil {
processor.Cancel(is, err)
continue
}
// Split the results back into interfaces
for _, result := range results.([]interface{}) {
out <- result
}
) (open bool) {
// Collect interfaces for batch processing
is, open := collect(ctx, maxSize, maxDuration, in)
if is != nil {
select {
// Cancel all inputs during shutdown
case <-ctx.Done():
processor.Cancel(is, ctx.Err())
// Otherwise Process the inputs
default:
results, err := processor.Process(ctx, is)
if err != nil {
processor.Cancel(is, err)
return open
}
// Split the results back into interfaces
for _, result := range results.([]interface{}) {
out <- result
}
}
// In is closed
if !open {
return
}
}
return open
}
25 changes: 13 additions & 12 deletions process_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ func Test_processBatch(t *testing.T) {
maxDuration: maxTestDuration,
processor: &mockProcessor{
// this will take longer to complete than the maxTestDuration by a few micro seconds
processDuration: maxTestDuration / 10, // 5 calls to Process > maxTestDuration / 2
cancelDuration: maxTestDuration / 10, // 5 calls to Cancel > maxTestDuration / 2
processDuration: maxTestDuration / 10, // 5 calls to Process > maxTestDuration / 2
cancelDuration: maxTestDuration/10 + 25*time.Millisecond, // 5 calls to Cancel > maxTestDuration / 2
},
in: Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
out: drain,
Expand Down Expand Up @@ -295,17 +295,18 @@ func Test_processBatch(t *testing.T) {

// Process the batch with a timeout of maxTestDuration
timeout := time.After(maxTestDuration)
done := make(chan interface{})
open := true
go func() {
processBatch(ctx, tt.args.maxSize, tt.args.maxDuration, tt.args.processor, tt.args.in, tt.args.out)
close(done)
}()
select {
case <-timeout:
case <-done:
open = false
break
loop:
for {
select {
case <-timeout:
break loop
default:
open = processOneBatch(ctx, tt.args.maxSize, tt.args.maxDuration, tt.args.processor, tt.args.in, tt.args.out)
if !open {
break loop
}
}
}

// Processing took longer than expected
Expand Down
67 changes: 67 additions & 0 deletions semaphore/semaphore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// package semaphore is like a sync.WaitGroup with an upper limit.
// It's useful for limiting concurrent operations.
//
// Example Usage
//
// // startMultiplying is a pipeline step that concurrently multiplies input numbers by a factor
// func startMultiplying(concurrency, factor int, in <-chan int) <-chan int {
// out := make(chan int)
// go func() {
// sem := semaphore.New(concurrency)
// for i := range in {
// // Multiply up to 'concurrency' inputs at once
// sem.Add(1)
// go func() {
// out <- factor * i
// sem.Done()
// }()
// }
// // Wait for all multiplications to finish before closing the output chan
// sem.Wait()
// close(out)
// }()
// return out
// }
//
package semaphore

// Semaphore is like a sync.WaitGroup, except it has a maximum
// number of items that can be added. If that maximum is reached,
// Add will block until Done is called.
type Semaphore chan struct{}

// New returns a new Semaphore
func New(max int) Semaphore {
// There are probably more memory efficient ways to implement
// a semaphore using runtime primitives like runtime_SemacquireMutex
return make(Semaphore, max)
}

// Add adds delta, which may be negative, to the semaphore buffer.
// If the buffer becomes 0, all goroutines blocked by Wait are released.
// If the buffer goes negative, Add will block until another goroutine makes it positive.
// If the buffer exceeds max, Add will block until another goroutine decrements the buffer.
func (s Semaphore) Add(delta int) {
// Increment the semaphore
for i := delta; i > 0; i-- {
s <- struct{}{}
}
// Decrement the semaphore
for i := delta; i < 0; i++ {
<-s
}
}

// Done decrements the semaphore by 1
func (s Semaphore) Done() {
s.Add(-1)
}

// Wait blocks until the semaphore is buffer is empty
func (s Semaphore) Wait() {
// Filling the buffered channel ensures that its empty
s.Add(cap(s))
// Free the buffer before closing (unsure if this matters)
s.Add(-cap(s))
close(s)
}

0 comments on commit 87f2017

Please sign in to comment.