Skip to content

Commit

Permalink
feat!: introduce AwaitCompletion method to Sink (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Jan 25, 2025
1 parent 40e6f14 commit 209586b
Show file tree
Hide file tree
Showing 35 changed files with 575 additions and 301 deletions.
18 changes: 16 additions & 2 deletions aerospike/aerospike.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type PollingSource struct {
statement *aero.Statement
recordsChan chan *aero.Result
out chan any
logger *slog.Logger

logger *slog.Logger
}

var _ streams.Source = (*PollingSource)(nil)
Expand Down Expand Up @@ -78,7 +79,10 @@ func NewPollingSource(ctx context.Context, client *aero.Client,
logger: logger,
}

// periodically monitor the database for changes
go source.pollChanges(ctx)

// send new or updated records downstream
go source.streamRecords(ctx)

return source
Expand Down Expand Up @@ -155,7 +159,7 @@ loop:
close(ps.out)
}

// Via streams data to a specified operator and returns it.
// Via asynchronously streams data to the given Flow and returns it.
func (ps *PollingSource) Via(operator streams.Flow) streams.Flow {
flow.DoStream(ps, operator)
return operator
Expand Down Expand Up @@ -213,6 +217,8 @@ type Sink struct {
config SinkConfig
buf []*Record
in chan any

done chan struct{}
logger *slog.Logger
}

Expand All @@ -231,6 +237,7 @@ func NewSink(client *aero.Client, config SinkConfig, logger *slog.Logger) *Sink
client: client,
config: config,
in: make(chan any),
done: make(chan struct{}),
logger: logger,
}

Expand All @@ -246,6 +253,8 @@ func NewSink(client *aero.Client, config SinkConfig, logger *slog.Logger) *Sink
}

func (as *Sink) processStream() {
defer close(as.done) // signal data processing completion

var flushTickerChan <-chan time.Time
if as.config.BatchSize > 1 && as.config.BufferFlushInterval > 0 {
ticker := time.NewTicker(as.config.BufferFlushInterval)
Expand Down Expand Up @@ -326,3 +335,8 @@ func (as *Sink) flushBuffer() {
func (as *Sink) In() chan<- any {
return as.in
}

// AwaitCompletion blocks until the Sink connector has processed all the received data.
func (as *Sink) AwaitCompletion() {
<-as.done
}
15 changes: 13 additions & 2 deletions aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type S3Source struct {
config *S3SourceConfig
objectCh chan string
out chan any
logger *slog.Logger

logger *slog.Logger
}

var _ streams.Source = (*S3Source)(nil)
Expand Down Expand Up @@ -165,7 +166,7 @@ func (s *S3Source) getObjects(ctx context.Context) {
close(s.out)
}

// Via streams data to a specified operator and returns it.
// Via asynchronously streams data to the given Flow and returns it.
func (s *S3Source) Via(operator streams.Flow) streams.Flow {
flow.DoStream(s, operator)
return operator
Expand Down Expand Up @@ -202,6 +203,8 @@ type S3Sink struct {
client *s3.Client
config *S3SinkConfig
in chan any

done chan struct{}
logger *slog.Logger
}

Expand All @@ -228,6 +231,7 @@ func NewS3Sink(ctx context.Context, client *s3.Client,
client: client,
config: config,
in: make(chan any, config.Parallelism),
done: make(chan struct{}),
logger: logger,
}

Expand All @@ -240,6 +244,8 @@ func NewS3Sink(ctx context.Context, client *s3.Client,
// writeObjects writes incoming stream data elements to S3 using the
// configured parallelism.
func (s *S3Sink) writeObjects(ctx context.Context) {
defer close(s.done) // signal data processing completion

var wg sync.WaitGroup
for i := 0; i < s.config.Parallelism; i++ {
wg.Add(1)
Expand Down Expand Up @@ -291,3 +297,8 @@ func (s *S3Sink) writeObject(ctx context.Context, putObject *S3Object) error {
func (s *S3Sink) In() chan<- any {
return s.in
}

// AwaitCompletion blocks until the S3Sink connector has processed all the received data.
func (s *S3Sink) AwaitCompletion() {
<-s.done
}
24 changes: 21 additions & 3 deletions azure/blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type BlobStorageSource struct {
containerClient *container.Client
config *BlobStorageSourceConfig
out chan any
logger *slog.Logger

logger *slog.Logger
}

var _ streams.Source = (*BlobStorageSource)(nil)
Expand Down Expand Up @@ -124,7 +125,7 @@ func (s *BlobStorageSource) listBlobsHierarchy(ctx context.Context, prefix, mark
}
}

// Via streams data to a specified operator and returns it.
// Via asynchronously streams data to the given Flow and returns it.
func (s *BlobStorageSource) Via(operator streams.Flow) streams.Flow {
flow.DoStream(s, operator)
return operator
Expand Down Expand Up @@ -161,6 +162,8 @@ type BlobStorageSink struct {
client *azblob.Client
config *BlobStorageSinkConfig
in chan any

done chan struct{}
logger *slog.Logger
}

Expand All @@ -187,6 +190,7 @@ func NewBlobStorageSink(ctx context.Context, client *azblob.Client,
client: client,
config: config,
in: make(chan any, config.Parallelism),
done: make(chan struct{}),
logger: logger,
}

Expand All @@ -199,6 +203,8 @@ func NewBlobStorageSink(ctx context.Context, client *azblob.Client,
// uploadBlobs writes incoming stream data elements to Azure Blob storage
// using the configured parallelism.
func (s *BlobStorageSink) uploadBlobs(ctx context.Context) {
defer close(s.done) // signal data processing completion

var wg sync.WaitGroup
for i := 0; i < s.config.Parallelism; i++ {
wg.Add(1)
Expand Down Expand Up @@ -231,7 +237,13 @@ func (s *BlobStorageSink) uploadBlobs(ctx context.Context) {

// uploadBlob uploads a single blob to Azure Blob storage.
func (s *BlobStorageSink) uploadBlob(ctx context.Context, object *BlobStorageObject) error {
defer object.Data.Close()
defer func() {
if err := object.Data.Close(); err != nil {
s.logger.Warn("Failed to close blob storage object",
slog.String("key", object.Key),
slog.Any("error", err))
}
}()
_, err := s.client.UploadStream(ctx, s.config.Container, object.Key, object.Data,
s.config.UploadOptions)
return err
Expand All @@ -241,3 +253,9 @@ func (s *BlobStorageSink) uploadBlob(ctx context.Context, object *BlobStorageObj
func (s *BlobStorageSink) In() chan<- any {
return s.in
}

// AwaitCompletion blocks until the BlobStorageSink connector has completed
// processing all the received data.
func (s *BlobStorageSink) AwaitCompletion() {
<-s.done
}
8 changes: 2 additions & 6 deletions examples/aerospike/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@ func main() {
log.Fatal(err)
}

ctx, cancelFunc := context.WithCancel(context.Background())
timer := time.NewTimer(time.Minute)
go func() {
<-timer.C
cancelFunc()
}()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

queryPolicy := aero.NewQueryPolicy()
queryPolicy.SendKey = true // send user defined key
Expand Down
3 changes: 0 additions & 3 deletions examples/aws/s3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"log"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
Expand Down Expand Up @@ -37,8 +36,6 @@ func main() {
source.
Via(mapObjects).
To(sink)

time.Sleep(time.Second)
}

func newS3Client(ctx context.Context) (*s3.Client, error) {
Expand Down
11 changes: 0 additions & 11 deletions examples/azure/blob/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"log"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/reugn/go-streams"
Expand Down Expand Up @@ -40,15 +39,11 @@ func main() {
Via(toBlobStorageFlow()).
To(newBlobSink(ctx, client))

awaitFlowCompletion()

// read blob data to stdout
newBlobSource(ctx, client).
Via(readBlobStorageFlow()).
To(extension.NewStdoutSink())

awaitFlowCompletion()

// clean up the container
cleanUp(ctx, client)
}
Expand Down Expand Up @@ -126,10 +121,4 @@ func cleanUp(ctx context.Context, client *azblob.Client) {
return blob.Key
}, 1)).
To(extension.NewStdoutSink())

awaitFlowCompletion()
}

func awaitFlowCompletion() {
time.Sleep(500 * time.Millisecond)
}
22 changes: 12 additions & 10 deletions examples/fs/main.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
package main

import (
"time"

ext "github.com/reugn/go-streams/extension"
"github.com/reugn/go-streams/flow"
)

func main() {
source := ext.NewFileSource("in.txt")
reverseMapFlow := flow.NewMap(reverse, 1)
reverseMapFlow := flow.NewMap(reverseString, 1)
newLineMapFlow := flow.NewMap(addNewLine, 1)
sink := ext.NewFileSink("out.txt")

source.
Via(reverseMapFlow).
Via(newLineMapFlow).
To(sink)

time.Sleep(time.Second)
}

var reverse = func(str string) string {
var reverse string
for i := len(str) - 1; i >= 0; i-- {
reverse += string(str[i])
func reverseString(s string) string {
runes := []rune(s)
for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 {
runes[i], runes[j] = runes[j], runes[i]
}
return reverse
return string(runes)
}

func addNewLine(s string) string {
return s + "\n"
}
3 changes: 0 additions & 3 deletions examples/gcp/storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log"
"time"

"cloud.google.com/go/storage"
"github.com/reugn/go-streams/flow"
Expand Down Expand Up @@ -45,8 +44,6 @@ func main() {
source.
Via(mapObjects).
To(sink)

time.Sleep(time.Second)
}

var transform = func(object *connector.StorageObject) *connector.StorageObject {
Expand Down
9 changes: 2 additions & 7 deletions examples/net/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,8 @@ import (
// Test producer: nc -u 127.0.0.1 3434
// Test consumer: nc -u -l 3535
func main() {
ctx, cancelFunc := context.WithCancel(context.Background())

timer := time.NewTimer(time.Minute)
go func() {
<-timer.C
cancelFunc()
}()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

source, err := ext.NewNetSource(ctx, ext.UDP, "127.0.0.1:3434")
if err != nil {
Expand Down
23 changes: 11 additions & 12 deletions examples/std/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"strconv"
"time"

Expand All @@ -17,29 +18,27 @@ func (msg *message) String() string {
}

func main() {

source := ext.NewChanSource(tickerChan(time.Second * 1))
mapFlow := flow.NewMap(addUTC, 1)
source := ext.NewChanSource(tickerChan(time.Second))
mapFlow := flow.NewMap(quote, 1)
sink := ext.NewStdoutSink()

source.
Via(mapFlow).
To(sink)
}

var addUTC = func(msg *message) *message {
msg.Msg += "-UTC"
func quote(msg *message) *message {
msg.Msg = fmt.Sprintf("%q", msg.Msg)
return msg
}

func tickerChan(repeat time.Duration) chan any {
ticker := time.NewTicker(repeat)
oc := ticker.C
nc := make(chan any)
func tickerChan(interval time.Duration) chan any {
outChan := make(chan any)
go func() {
for range oc {
nc <- &message{strconv.FormatInt(time.Now().UnixNano(), 10)}
ticker := time.NewTicker(interval)
for t := range ticker.C {
outChan <- &message{Msg: strconv.FormatInt(t.UnixMilli(), 10)}
}
}()
return nc
return outChan
}
2 changes: 1 addition & 1 deletion examples/websocket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (server *wsServer) init() {
}()

log.Print("http server started on :8080")
err := http.ListenAndServe(":8080", nil)
err := http.ListenAndServe(":8080", nil) //nolint:gosec
if err != nil {
log.Fatalf("Error in http.ListAndServe: %s", err)
}
Expand Down
Loading

0 comments on commit 209586b

Please sign in to comment.