Overview
This library was created with the aim of achieving the same thing in Golang with reference to Akka Stream.
You can combine units called Source, Flow, and Sink to create a complex stream pipeline.
Furthermore, if an error is returning to the return value of the process, the stream can be canceled at any point.
Below is how to write a complex graph.
/*
source ~> balance ~> merge ~> sink
balance ~> merge
balance ~> garbageSink
*/
source.Out().Wire(balance.In())
balance.Out()[0].Wire(merge.In()[0])
balance.Out()[1].Wire(merge.In()[1])
balance.Out()[2].Wire(garbageSink.In())
merge.Out().Wire(sink.In())
-
ChannelSource
Input to the channel is the source.
-
InfiniteSource
The set elements spring up infinitely.
-
SliceSource
The element of the slice becomes the source.
-
BufferFlow
Flow with buffer.
-
ThrottleFlow
Pass the element once in a certain time.
-
MapFlow
-
FilterFlow
-
FlowFromSinkAndSource
-
Sink
-
IgnoreSink
etc...
- Balance
- Broadcast
- Merge
$ go get github.com/BambooTuna/gooastream@v1.1.4
$ go get github.com/BambooTuna/gooastream@v1.1.9-beta
See the examples directory for the latest samples.
Simple Stream Sample
package examples
import (
"context"
"fmt"
"github.com/BambooTuna/gooastream/stream"
"sync"
"time"
)
func SimpleRunnableStream() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
var wg sync.WaitGroup
n := 5
list := make([]interface{}, n)
wg.Add(n)
for i := 0; i < n; i++ {
list[i] = i
}
source := stream.NewSliceSource(list)
flow := stream.NewBufferFlow(0)
sink := stream.NewSink(func(i interface{}) error {
fmt.Println(i)
wg.Done()
return nil
})
runnable := source.Via(flow).To(sink)
done, runningCancel := runnable.Run(ctx)
go func() {
wg.Wait()
runningCancel()
}()
// blocking until runningCancel is called
done()
}
Complex Stream Sample
package examples
import (
"context"
"fmt"
"github.com/BambooTuna/gooastream/builder"
"github.com/BambooTuna/gooastream/stream"
"sync"
"time"
)
func ComplexConstructedStream() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
var wg sync.WaitGroup
n := 10
list := make([]interface{}, n)
wg.Add(n)
for i := 0; i < n; i++ {
list[i] = i
}
graphBuilder := builder.NewGraphBuilder()
source := graphBuilder.AddSource(stream.NewSliceSource(list))
balance := graphBuilder.AddBalance(builder.NewBalance(3))
merge := graphBuilder.AddMerge(builder.NewMerge(2))
garbageSink := graphBuilder.AddSink(stream.NewSink(func(i interface{}) error {
fmt.Println(i)
wg.Done()
return nil
}))
sink := graphBuilder.AddSink(stream.NewSink(func(i interface{}) error {
fmt.Println(i)
wg.Done()
return nil
}))
/*
source ~> balance ~> merge ~> sink
balance ~> merge
balance ~> garbageSink
*/
source.Out().Wire(balance.In())
balance.Out()[0].Wire(merge.In()[0])
balance.Out()[1].Wire(merge.In()[1])
balance.Out()[2].Wire(garbageSink.In())
merge.Out().Wire(sink.In())
runnable := graphBuilder.ToRunnable()
done, runningCancel := runnable.Run(ctx)
go func() {
wg.Wait()
runningCancel()
}()
// blocking until runningCancel is called
done()
}