Skip to content

Commit

Permalink
Change to dynamic select
Browse files Browse the repository at this point in the history
  • Loading branch information
ghjm committed Mar 21, 2022
1 parent 5bb97f1 commit 31dc36e
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ package stream

import (
"context"
"crypto/rand"
"math/big"
"reflect"
)

// Pipe accepts an incoming data channel and pipes it to the supplied
Expand Down Expand Up @@ -168,14 +167,6 @@ func Distribute[T any](ctx context.Context, in <-chan T, out ...chan<- T) {
}

for {
// Generate a random number with good entropy to determine which
// channel the data should be sent to.
// TODO: Determine if this hinders performance too much and if so,
// switch to something like fastrand which is used in the runtime
// for the select statement.
r, _ := rand.Int(rand.Reader, big.NewInt(int64(len(out))))
index := int(r.Int64()) % len(out)

select {
case <-ctx.Done():
return
Expand All @@ -188,11 +179,19 @@ func Distribute[T any](ctx context.Context, in <-chan T, out ...chan<- T) {
func() {
defer recover()

select {
case <-ctx.Done():
return
case out[index] <- v:
selectCases := make([]reflect.SelectCase, 0, len(out)+1)
for _, outc := range out {
selectCases = append(selectCases, reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(outc),
Send: reflect.ValueOf(v),
})
}
selectCases = append(selectCases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
})
_, _, _ = reflect.Select(selectCases)
}()
}

Expand Down

0 comments on commit 31dc36e

Please sign in to comment.