Skip to content

Commit

Permalink
Reverting stupid constraint I created
Browse files Browse the repository at this point in the history
  • Loading branch information
benjivesterby committed Sep 13, 2022
1 parent 026f219 commit d9326a4
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 25 deletions.
30 changes: 11 additions & 19 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,14 @@ import (
"go.structs.dev/gen"
)

type readonly[T any] interface {
<-chan T | chan T
}

type writeonly[T any] interface {
chan<- T | chan T
}

// Pipe accepts an incoming data channel and pipes it to the supplied
// outgoing data channel.
//
// NOTE: Execute the Pipe function in a goroutine if parallel execution is
// desired. Canceling the context or closing the incoming channel is important
// to ensure that the goroutine is properly terminated.
func Pipe[In readonly[T], Out writeonly[T], T any](
ctx context.Context, in In, out Out,
func Pipe[T any](
ctx context.Context, in <-chan T, out chan<- T,
) {
ctx = _ctx(ctx)

Expand Down Expand Up @@ -62,9 +54,9 @@ type InterceptFunc[T, U any] func(context.Context, T) (U, bool)
// indicating whether the data should be forwarded to the output channel.
// The function is executed for each data item in the incoming channel as long
// as the context is not canceled or the incoming channel remains open.
func Intercept[In readonly[T], T, U any](
func Intercept[T, U any](
ctx context.Context,
in In,
in <-chan T,
fn InterceptFunc[T, U],
) <-chan U {
ctx = _ctx(ctx)
Expand Down Expand Up @@ -112,7 +104,7 @@ func Intercept[In readonly[T], T, U any](
// NOTE: The transfer takes place in a goroutine for each channel
// so ensuring that the context is canceled or the incoming channels
// are closed is important to ensure that the goroutine is terminated.
func FanIn[In readonly[T], T any](ctx context.Context, in ...In) <-chan T {
func FanIn[T any](ctx context.Context, in ...<-chan T) <-chan T {
ctx = _ctx(ctx)
out := make(chan T)

Expand Down Expand Up @@ -147,8 +139,8 @@ func FanIn[In readonly[T], T any](ctx context.Context, in ...In) <-chan T {
// NOTE: Execute the FanOut function in a goroutine if parallel execution is
// desired. Canceling the context or closing the incoming channel is important
// to ensure that the goroutine is properly terminated.
func FanOut[In readonly[T], Out writeonly[T], T any](
ctx context.Context, in In, out ...Out,
func FanOut[T any](
ctx context.Context, in <-chan T, out ...chan<- T,
) {
ctx = _ctx(ctx)

Expand Down Expand Up @@ -207,8 +199,8 @@ func FanOut[In readonly[T], Out writeonly[T], T any](
// NOTE: Execute the Distribute function in a goroutine if parallel execution is
// desired. Canceling the context or closing the incoming channel is important
// to ensure that the goroutine is properly terminated.
func Distribute[In readonly[T], Out writeonly[T], T any](
ctx context.Context, in In, out ...Out,
func Distribute[T any](
ctx context.Context, in <-chan T, out ...chan<- T,
) {
ctx = _ctx(ctx)

Expand Down Expand Up @@ -244,7 +236,7 @@ func Distribute[In readonly[T], Out writeonly[T], T any](

// Drain accepts a channel and drains the channel until the channel is closed
// or the context is canceled.
func Drain[U readonly[T], T any](ctx context.Context, in U) {
func Drain[T any](ctx context.Context, in <-chan T) {
ctx = _ctx(ctx)

go func() {
Expand All @@ -263,7 +255,7 @@ func Drain[U readonly[T], T any](ctx context.Context, in U) {

// Any accepts an incoming data channel and converts the channel to a readonly
// channel of the `any` type.
func Any[U readonly[T], T any](ctx context.Context, in U) <-chan any {
func Any[T any](ctx context.Context, in <-chan T) <-chan any {
return Intercept(ctx, in, func(_ context.Context, in T) (any, bool) {
return in, true
})
Expand Down
9 changes: 3 additions & 6 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func Test_Distribute_ZeroOut(t *testing.T) {
in := make(chan int)
defer close(in)

Distribute[<-chan int, chan<- int](ctx, in)
Distribute(ctx, in)
}

func Test_FanOut(t *testing.T) {
Expand Down Expand Up @@ -508,17 +508,14 @@ func Test_FanOut_ZeroOut(t *testing.T) {
in := make(chan int)
defer close(in)

FanOut[<-chan int, chan<- int](ctx, in)
FanOut(ctx, in)
}

func Test_FanIn_ZeroIn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

in := make(chan int)
defer close(in)

FanIn[<-chan int](ctx)
FanIn[int](ctx)
}

func Test_Drain(t *testing.T) {
Expand Down

0 comments on commit d9326a4

Please sign in to comment.