Skip to content

Commit

Permalink
feat(pipeline): added a batch processor
Browse files Browse the repository at this point in the history
Needs some more test improvements
  • Loading branch information
marksalpeter committed Mar 21, 2021
1 parent 5db8219 commit b27f24f
Show file tree
Hide file tree
Showing 12 changed files with 637 additions and 76 deletions.
32 changes: 32 additions & 0 deletions cancel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package util

import "context"

// Cacel passes the in chan to the out chan until the context is canceled.
// When the context is canceled, everything from in is passed to the cancel func instead of out.
// Out closes when in is closed.
func Cancel(ctx context.Context, canceled func(i interface{}), in <-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
for {
select {
// When the context isnt canceld, pass everything to the out chan
// until in is closed
case i, open := <-in:
if !open {
return
}
out <- i
// When the context is canceled, pass all ins to the
// cancel fun until in is closed
case <-ctx.Done():
for i := range in {
canceled(i)
}
return
}
}
}()
return out
}
57 changes: 57 additions & 0 deletions cancel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package util

import (
"context"
"reflect"
"testing"
"time"
)

func TestCancel(t *testing.T) {
type args struct {
ctx context.Context
cancel func(i interface{})
in <-chan interface{}
}
tests := []struct {
name string
args args
want <-chan interface{}
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := Cancel(tt.args.ctx, tt.args.cancel, tt.args.in); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Cancel() = %v, want %v", got, tt.want)
}
})
}
t.Run("in never closes", func(t *testing.T) {
// In closes after the context
in := make(chan interface{})
go func() {
defer close(in)
i := 0
endAt := time.Now().Add(10 * time.Millisecond)
for now := time.Now(); now.Before(endAt); now = time.Now() {
in <- i
i++
}
t.Log("ended")
}()

// Create a logger for the cancel fun
canceled := func(i interface{}) {
t.Logf("canceled: %d\n", i)
}

// Context times out after 1 second
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
for o := range Cancel(ctx, canceled, in) {
t.Logf("out: %d", o)
}

})
}
18 changes: 10 additions & 8 deletions collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"time"
)

// Collect outputs `max` interfaces at a time from `in` or whatever came through during the last `duration`
func Collect(max int, duration time.Duration, in <-chan interface{}) <-chan []interface{} {
out := make(chan []interface{})
// Collect collects up to maxSize inputs over up to maxDuration before returning them as []interface{}.
// If maxSize is reached before maxDuration, [maxSize]interface{} will be returned.
// If maxDuration is reached before maxSize is collected, [>maxSize]interface{} will be returned.
// If no inputs are collected over maxDuration, no outputs will be returned.
func Collect(maxSize int, maxDuration time.Duration, in <-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
// Correct memory management
defer close(out)

var buffer []interface{}
timeout := time.After(duration)
timeout := time.After(maxDuration)
for {
lenBuffer := len(buffer)
select {
Expand All @@ -23,20 +24,21 @@ func Collect(max int, duration time.Duration, in <-chan interface{}) <-chan []in
return
} else if !open {
return
} else if lenBuffer < max-1 {
} else if lenBuffer < maxSize-1 {
// There is still room in the buffer
buffer = append(buffer, i)
} else {
// There is no room left in the buffer
out <- append(buffer, i)
buffer = nil
timeout = time.After(maxDuration)
}
case <-timeout:
if lenBuffer > 0 {
// We timed out with some items left in the buffer
out <- buffer
buffer = nil
timeout = time.After(duration)
timeout = time.After(maxDuration)
}
}
}
Expand Down
136 changes: 68 additions & 68 deletions collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,120 +9,120 @@ import (
// TestCollect tests the following cases of the Collect func
// 1. Closes when in closes
// 2. Remains Open if in remains open
// 3. Collects max and returns chunks
// 3. Collects max and returns immediately
// 4. Returns everything passed in if less than max after duration
// 5. After duration with nothing in the buffer, nothing is returned, channel remains open
func TestCollect(t *testing.T) {
// emit emits a slice of strings as a channel of interfaces
emit := func(ins []string, delay time.Duration) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
for _, i := range ins {
time.Sleep(delay)
out <- i
}
}()
return out
}

const maxTestDuration = time.Second
type args struct {
max int
duration time.Duration
in []string
inDelay time.Duration
maxSize int
maxDuration time.Duration
in []interface{}
inDelay time.Duration
}
type want struct {
out [][]interface{}
close bool
out []interface{}
open bool
}
for _, test := range []struct {
name string
timeout time.Duration
args args
want want
name string
args args
want want
}{{
name: "closes when in closes",
timeout: 10 * time.Millisecond,
name: "out closes when in closes",
args: args{
max: 20,
duration: time.Second, // should close even if duration hasn't elapsed
in: nil,
maxSize: 20,
maxDuration: maxTestDuration,
in: nil,
inDelay: 0,
},
want: want{
out: nil,
close: true,
out: nil,
open: false,
},
}, {
name: "remains open if in remains open",
timeout: time.Second,
name: "out remains open if in remains open",
args: args{
max: 1,
duration: time.Second,
in: []string{"one second", "two seconds", "three seconds"},
inDelay: 750 * time.Millisecond,
maxSize: 2,
maxDuration: maxTestDuration,
in: []interface{}{1, 2, 3},
inDelay: (maxTestDuration / 2) - (100 * time.Millisecond),
},
want: want{
out: [][]interface{}{{"one second"}},
close: false,
out: []interface{}{[]interface{}{1, 2}},
open: true,
},
}, {
name: "collects max and returns chunks",
timeout: 60 * time.Millisecond,
name: "collects maxSize inputs and returns",
args: args{
max: 2,
duration: time.Second,
inDelay: 10 * time.Millisecond,
in: []string{"1", "2", "3", "4", "5"},
maxSize: 2,
maxDuration: maxTestDuration / 10 * 9,
inDelay: maxTestDuration / 10,
in: []interface{}{1, 2, 3, 4, 5},
},
want: want{
out: [][]interface{}{{"1", "2"}, {"3", "4"}, {"5"}},
close: true,
out: []interface{}{
[]interface{}{1, 2},
[]interface{}{3, 4},
[]interface{}{5},
},
open: false,
},
}, {
name: "returns chunk after duration",
timeout: 45 * time.Millisecond,
name: "collection returns after maxDuration with < maxSize",
args: args{
max: 10,
duration: 10 * time.Millisecond,
inDelay: 8 * time.Millisecond,
in: []string{"1", "2", "3", "4", "5"},
maxSize: 10,
maxDuration: maxTestDuration / 4,
inDelay: (maxTestDuration / 4) - (10 * time.Millisecond),
in: []interface{}{1, 2, 3, 4, 5},
},
want: want{
out: [][]interface{}{{"1"}, {"2"}, {"3"}, {"4"}},
close: false,
out: []interface{}{
[]interface{}{1},
[]interface{}{2},
[]interface{}{3},
[]interface{}{4},
},
open: true,
},
}} {
t.Run(test.name, func(t *testing.T) {
// Create the timeout
timeoutAfter := time.After(test.timeout)
// Create the in channel
in := make(chan interface{})
go func() {
defer close(in)
for _, i := range test.args.in {
time.Sleep(test.args.inDelay)
in <- i
}
}()

// Collect responses
collect := Collect(test.args.max, test.args.duration, emit(test.args.in, test.args.inDelay))
var outs [][]interface{}
var didClose bool
collect := Collect(test.args.maxSize, test.args.maxDuration, in)
timeout := time.After(maxTestDuration)
var outs []interface{}
var isOpen bool
loop:
for {
select {
case out, open := <-collect:
if !open {
didClose = true
isOpen = false
break loop
}
isOpen = true
outs = append(outs, out)
case <-timeoutAfter:
case <-timeout:
break loop
}
}

// Did we close? Were we expecting to close
if test.want.close && !didClose {
t.Errorf("expected out channel to close!")
} else if !test.want.close && didClose {
t.Errorf("did not expect out channel to close!")
// Expecting to close or stay open
if test.want.open != isOpen {
t.Errorf("%t = %t", test.want.open, isOpen)
}

// Compare outputs
// Expecting outputs
if !reflect.DeepEqual(test.want.out, outs) {
t.Errorf("%v != %v", test.want.out, outs)
}
Expand Down
27 changes: 27 additions & 0 deletions delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package util

import (
"context"
"time"
)

// Delay delays reading each input by duration.
// If the context is canceled the delay will not be applied.
func Delay(ctx context.Context, duration time.Duration, in <-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
// Keep reading from in until its closed
for i := range in {
// Take one element from in and pass it to out
out <- i
select {
// Wait duration before reading another input
case <-time.After(duration):
// Don't wait if the context is canceled
case <-ctx.Done():
}
}
}()
return out
}
Loading

0 comments on commit b27f24f

Please sign in to comment.