Skip to content

Commit

Permalink
Correcting issue in fan-in
Browse files Browse the repository at this point in the history
  • Loading branch information
benjivesterby committed Jul 18, 2022
1 parent b8105ad commit 080466e
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 27 deletions.
10 changes: 0 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,6 @@ To install the package, run:
go get -u go.atomizer.io/stream@latest
```

## Importing

It is recommended to use the package via the following import:

`import . "go.atomizer.io/stream"`

Using the `.` import allows for functions to be called directly as if the
functions were in the same namespace without the need to append the package
name.

## Benchmarks

To execute the benchmarks, run the following command:
Expand Down
4 changes: 2 additions & 2 deletions scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"
"time"

. "go.structs.dev/gen"
"go.structs.dev/gen"
)

var emptyFn = func(context.Context, any) (any, bool) { return 0, true }
Expand All @@ -24,7 +24,7 @@ func ScalerTest[U ~[]T, T comparable](
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

testdata := Slice[T](data)
testdata := gen.Slice[T](data)

integers := testdata.Map()

Expand Down
14 changes: 10 additions & 4 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ package stream
import (
"context"
"reflect"
"sync"

. "go.structs.dev/gen"
"go.structs.dev/gen"
)

// Pipe accepts an incoming data channel and pipes it to the supplied
Expand Down Expand Up @@ -110,16 +111,21 @@ func FanIn[T any](ctx context.Context, in ...<-chan T) <-chan T {
return out
}

var wg sync.WaitGroup
defer func() {
go func() {
<-ctx.Done()
wg.Wait()
close(out)
}()
}()

wg.Add(len(in))
for _, i := range in {
// Pipe the result of the channel to the output channel.
go Pipe(ctx, i, out)
go func(i <-chan T) {
defer wg.Done()
Pipe(ctx, i, out)
}(i)
}

return out
Expand Down Expand Up @@ -177,7 +183,7 @@ func FanOut[T any](ctx context.Context, in <-chan T, out ...chan<- T) {
return
}

selectCases = Exclude(selectCases, selectCases[chosen])
selectCases = gen.Exclude(selectCases, selectCases[chosen])
}
}

Expand Down
4 changes: 2 additions & 2 deletions stream_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"testing"

. "go.structs.dev/gen"
"go.structs.dev/gen"
)

func Benchmark_Pipe(b *testing.B) {
Expand Down Expand Up @@ -122,7 +122,7 @@ func Benchmark_Scaler(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

testdata := Slice[int](Ints[int](100))
testdata := gen.Slice[int](Ints[int](100))

s := Scaler[int, int]{
Fn: func(_ context.Context, in int) (int, bool) {
Expand Down
20 changes: 11 additions & 9 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"
"time"

. "go.structs.dev/gen"
"go.structs.dev/gen"
)

func PipeTest[U ~[]T, T comparable](
Expand Down Expand Up @@ -95,7 +95,7 @@ func FanInTest[U ~[]T, T comparable](
out[i] = make(chan T)
}

fan := FanIn(ctx, ReadOnly(out...)...)
fan := FanIn(ctx, gen.ReadOnly(out...)...)

ichan := 0
cursor := 0
Expand All @@ -117,23 +117,25 @@ func FanInTest[U ~[]T, T comparable](
}

returned := make([]T, len(data))
for i := 0; i < len(data); i++ {
for i := 0; ; i++ {
select {
case <-ctx.Done():
t.Error("context cancelled")
return
case out, ok := <-fan:
if !ok {
if i != len(data)-1 {
t.Fatal("c2 closed prematurely")
if i != len(data) {
t.Fatalf("c2 closed prematurely; index %v", i)
}

return
}

returned[i] = out
}
}

diff := Diff(data, returned)
diff := gen.Diff(data, returned)
if len(diff) != 0 {
t.Errorf("unexpected diff: %v", diff)
}
Expand Down Expand Up @@ -222,7 +224,7 @@ func Test_Intercept_ChangeType(t *testing.T) {

out := Intercept(
ctx,
Slice[int](integers).Chan(ctx),
gen.Slice[int](integers).Chan(ctx),
func(_ context.Context, in int) (bool, bool) {
return in%2 == 0, true
})
Expand Down Expand Up @@ -355,7 +357,7 @@ func DistributeTest[U ~[]T, T comparable](

c1, c2, c3 := make(chan T), make(chan T), make(chan T)

go Distribute(ctx, Slice[T](data).Chan(ctx), c1, c2, c3)
go Distribute(ctx, gen.Slice[T](data).Chan(ctx), c1, c2, c3)

c1total, c2total, c3total := 0, 0, 0
for i := 0; i < len(data); i++ {
Expand Down Expand Up @@ -451,7 +453,7 @@ func Test_FanOut(t *testing.T) {
var c4 chan int
data := Ints[int](1000)

go FanOut(ctx, Slice[int](data).Chan(ctx), c1, c2, c3, c4)
go FanOut(ctx, gen.Slice[int](data).Chan(ctx), c1, c2, c3, c4)

seen := make(map[int]int)
for i := 0; i < len(data)*3; i++ {
Expand Down

0 comments on commit 080466e

Please sign in to comment.