From 209586bb9914abdc50bf8653a353fd981fe99e1e Mon Sep 17 00:00:00 2001 From: "Eugene R." Date: Sat, 25 Jan 2025 14:23:32 +0200 Subject: [PATCH] feat!: introduce AwaitCompletion method to Sink (#154) --- aerospike/aerospike.go | 18 +++++- aws/s3.go | 15 ++++- azure/blob_storage.go | 24 +++++++- examples/aerospike/main.go | 8 +-- examples/aws/s3/main.go | 3 - examples/azure/blob/main.go | 11 ---- examples/fs/main.go | 22 ++++--- examples/gcp/storage/main.go | 3 - examples/net/main.go | 9 +-- examples/std/main.go | 23 ++++--- examples/websocket/main.go | 2 +- extension/chan.go | 7 ++- extension/fs.go | 115 ++++++++++++++++++++++------------- extension/net.go | 112 ++++++++++++++++++++-------------- extension/std.go | 57 ++++++++++------- flow/batch.go | 6 +- flow/filter.go | 6 +- flow/flat_map.go | 6 +- flow/keyed.go | 6 +- flow/map.go | 6 +- flow/pass_through.go | 6 +- flow/reduce.go | 6 +- flow/session_window.go | 6 +- flow/sliding_window.go | 6 +- flow/throttler.go | 6 +- flow/tumbling_window.go | 6 +- gcp/storage.go | 22 ++++++- kafka/kafka_sarama.go | 41 +++++++++---- nats/nats_jetstream.go | 34 ++++++++--- nats/nats_streaming.go | 44 ++++++++++---- pulsar/pulsar.go | 44 ++++++++++---- redis/redis_pubsub.go | 42 +++++++++---- redis/redis_stream.go | 41 +++++++++---- streams.go | 21 +++++++ websocket/web_socket.go | 92 +++++++++++++++++----------- 35 files changed, 575 insertions(+), 301 deletions(-) diff --git a/aerospike/aerospike.go b/aerospike/aerospike.go index f2fa743..1db900a 100644 --- a/aerospike/aerospike.go +++ b/aerospike/aerospike.go @@ -42,7 +42,8 @@ type PollingSource struct { statement *aero.Statement recordsChan chan *aero.Result out chan any - logger *slog.Logger + + logger *slog.Logger } var _ streams.Source = (*PollingSource)(nil) @@ -78,7 +79,10 @@ func NewPollingSource(ctx context.Context, client *aero.Client, logger: logger, } + // periodically monitor the database for changes go source.pollChanges(ctx) + + // send new or updated records downstream go source.streamRecords(ctx) return source @@ -155,7 +159,7 @@ loop: close(ps.out) } -// Via streams data to a specified operator and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (ps *PollingSource) Via(operator streams.Flow) streams.Flow { flow.DoStream(ps, operator) return operator @@ -213,6 +217,8 @@ type Sink struct { config SinkConfig buf []*Record in chan any + + done chan struct{} logger *slog.Logger } @@ -231,6 +237,7 @@ func NewSink(client *aero.Client, config SinkConfig, logger *slog.Logger) *Sink client: client, config: config, in: make(chan any), + done: make(chan struct{}), logger: logger, } @@ -246,6 +253,8 @@ func NewSink(client *aero.Client, config SinkConfig, logger *slog.Logger) *Sink } func (as *Sink) processStream() { + defer close(as.done) // signal data processing completion + var flushTickerChan <-chan time.Time if as.config.BatchSize > 1 && as.config.BufferFlushInterval > 0 { ticker := time.NewTicker(as.config.BufferFlushInterval) @@ -326,3 +335,8 @@ func (as *Sink) flushBuffer() { func (as *Sink) In() chan<- any { return as.in } + +// AwaitCompletion blocks until the Sink connector has processed all the received data. +func (as *Sink) AwaitCompletion() { + <-as.done +} diff --git a/aws/s3.go b/aws/s3.go index c808a3a..f8771a5 100644 --- a/aws/s3.go +++ b/aws/s3.go @@ -38,7 +38,8 @@ type S3Source struct { config *S3SourceConfig objectCh chan string out chan any - logger *slog.Logger + + logger *slog.Logger } var _ streams.Source = (*S3Source)(nil) @@ -165,7 +166,7 @@ func (s *S3Source) getObjects(ctx context.Context) { close(s.out) } -// Via streams data to a specified operator and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (s *S3Source) Via(operator streams.Flow) streams.Flow { flow.DoStream(s, operator) return operator @@ -202,6 +203,8 @@ type S3Sink struct { client *s3.Client config *S3SinkConfig in chan any + + done chan struct{} logger *slog.Logger } @@ -228,6 +231,7 @@ func NewS3Sink(ctx context.Context, client *s3.Client, client: client, config: config, in: make(chan any, config.Parallelism), + done: make(chan struct{}), logger: logger, } @@ -240,6 +244,8 @@ func NewS3Sink(ctx context.Context, client *s3.Client, // writeObjects writes incoming stream data elements to S3 using the // configured parallelism. func (s *S3Sink) writeObjects(ctx context.Context) { + defer close(s.done) // signal data processing completion + var wg sync.WaitGroup for i := 0; i < s.config.Parallelism; i++ { wg.Add(1) @@ -291,3 +297,8 @@ func (s *S3Sink) writeObject(ctx context.Context, putObject *S3Object) error { func (s *S3Sink) In() chan<- any { return s.in } + +// AwaitCompletion blocks until the S3Sink connector has processed all the received data. +func (s *S3Sink) AwaitCompletion() { + <-s.done +} diff --git a/azure/blob_storage.go b/azure/blob_storage.go index 3735ffa..0ee2836 100644 --- a/azure/blob_storage.go +++ b/azure/blob_storage.go @@ -31,7 +31,8 @@ type BlobStorageSource struct { containerClient *container.Client config *BlobStorageSourceConfig out chan any - logger *slog.Logger + + logger *slog.Logger } var _ streams.Source = (*BlobStorageSource)(nil) @@ -124,7 +125,7 @@ func (s *BlobStorageSource) listBlobsHierarchy(ctx context.Context, prefix, mark } } -// Via streams data to a specified operator and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (s *BlobStorageSource) Via(operator streams.Flow) streams.Flow { flow.DoStream(s, operator) return operator @@ -161,6 +162,8 @@ type BlobStorageSink struct { client *azblob.Client config *BlobStorageSinkConfig in chan any + + done chan struct{} logger *slog.Logger } @@ -187,6 +190,7 @@ func NewBlobStorageSink(ctx context.Context, client *azblob.Client, client: client, config: config, in: make(chan any, config.Parallelism), + done: make(chan struct{}), logger: logger, } @@ -199,6 +203,8 @@ func NewBlobStorageSink(ctx context.Context, client *azblob.Client, // uploadBlobs writes incoming stream data elements to Azure Blob storage // using the configured parallelism. func (s *BlobStorageSink) uploadBlobs(ctx context.Context) { + defer close(s.done) // signal data processing completion + var wg sync.WaitGroup for i := 0; i < s.config.Parallelism; i++ { wg.Add(1) @@ -231,7 +237,13 @@ func (s *BlobStorageSink) uploadBlobs(ctx context.Context) { // uploadBlob uploads a single blob to Azure Blob storage. func (s *BlobStorageSink) uploadBlob(ctx context.Context, object *BlobStorageObject) error { - defer object.Data.Close() + defer func() { + if err := object.Data.Close(); err != nil { + s.logger.Warn("Failed to close blob storage object", + slog.String("key", object.Key), + slog.Any("error", err)) + } + }() _, err := s.client.UploadStream(ctx, s.config.Container, object.Key, object.Data, s.config.UploadOptions) return err @@ -241,3 +253,9 @@ func (s *BlobStorageSink) uploadBlob(ctx context.Context, object *BlobStorageObj func (s *BlobStorageSink) In() chan<- any { return s.in } + +// AwaitCompletion blocks until the BlobStorageSink connector has completed +// processing all the received data. +func (s *BlobStorageSink) AwaitCompletion() { + <-s.done +} diff --git a/examples/aerospike/main.go b/examples/aerospike/main.go index c3092ca..8bf5edf 100644 --- a/examples/aerospike/main.go +++ b/examples/aerospike/main.go @@ -16,12 +16,8 @@ func main() { log.Fatal(err) } - ctx, cancelFunc := context.WithCancel(context.Background()) - timer := time.NewTimer(time.Minute) - go func() { - <-timer.C - cancelFunc() - }() + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() queryPolicy := aero.NewQueryPolicy() queryPolicy.SendKey = true // send user defined key diff --git a/examples/aws/s3/main.go b/examples/aws/s3/main.go index 2f5ec8f..a2b357d 100644 --- a/examples/aws/s3/main.go +++ b/examples/aws/s3/main.go @@ -5,7 +5,6 @@ import ( "fmt" "log" "strings" - "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -37,8 +36,6 @@ func main() { source. Via(mapObjects). To(sink) - - time.Sleep(time.Second) } func newS3Client(ctx context.Context) (*s3.Client, error) { diff --git a/examples/azure/blob/main.go b/examples/azure/blob/main.go index 8a6e7fa..fab2d97 100644 --- a/examples/azure/blob/main.go +++ b/examples/azure/blob/main.go @@ -6,7 +6,6 @@ import ( "io" "log" "strings" - "time" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/reugn/go-streams" @@ -40,15 +39,11 @@ func main() { Via(toBlobStorageFlow()). To(newBlobSink(ctx, client)) - awaitFlowCompletion() - // read blob data to stdout newBlobSource(ctx, client). Via(readBlobStorageFlow()). To(extension.NewStdoutSink()) - awaitFlowCompletion() - // clean up the container cleanUp(ctx, client) } @@ -126,10 +121,4 @@ func cleanUp(ctx context.Context, client *azblob.Client) { return blob.Key }, 1)). To(extension.NewStdoutSink()) - - awaitFlowCompletion() -} - -func awaitFlowCompletion() { - time.Sleep(500 * time.Millisecond) } diff --git a/examples/fs/main.go b/examples/fs/main.go index 86e8803..5395bea 100644 --- a/examples/fs/main.go +++ b/examples/fs/main.go @@ -1,28 +1,30 @@ package main import ( - "time" - ext "github.com/reugn/go-streams/extension" "github.com/reugn/go-streams/flow" ) func main() { source := ext.NewFileSource("in.txt") - reverseMapFlow := flow.NewMap(reverse, 1) + reverseMapFlow := flow.NewMap(reverseString, 1) + newLineMapFlow := flow.NewMap(addNewLine, 1) sink := ext.NewFileSink("out.txt") source. Via(reverseMapFlow). + Via(newLineMapFlow). To(sink) - - time.Sleep(time.Second) } -var reverse = func(str string) string { - var reverse string - for i := len(str) - 1; i >= 0; i-- { - reverse += string(str[i]) +func reverseString(s string) string { + runes := []rune(s) + for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 { + runes[i], runes[j] = runes[j], runes[i] } - return reverse + return string(runes) +} + +func addNewLine(s string) string { + return s + "\n" } diff --git a/examples/gcp/storage/main.go b/examples/gcp/storage/main.go index df86237..58ea96e 100644 --- a/examples/gcp/storage/main.go +++ b/examples/gcp/storage/main.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log" - "time" "cloud.google.com/go/storage" "github.com/reugn/go-streams/flow" @@ -45,8 +44,6 @@ func main() { source. Via(mapObjects). To(sink) - - time.Sleep(time.Second) } var transform = func(object *connector.StorageObject) *connector.StorageObject { diff --git a/examples/net/main.go b/examples/net/main.go index 060ee72..c7c5b00 100644 --- a/examples/net/main.go +++ b/examples/net/main.go @@ -13,13 +13,8 @@ import ( // Test producer: nc -u 127.0.0.1 3434 // Test consumer: nc -u -l 3535 func main() { - ctx, cancelFunc := context.WithCancel(context.Background()) - - timer := time.NewTimer(time.Minute) - go func() { - <-timer.C - cancelFunc() - }() + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() source, err := ext.NewNetSource(ctx, ext.UDP, "127.0.0.1:3434") if err != nil { diff --git a/examples/std/main.go b/examples/std/main.go index fa489c2..56e95f3 100644 --- a/examples/std/main.go +++ b/examples/std/main.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "strconv" "time" @@ -17,9 +18,8 @@ func (msg *message) String() string { } func main() { - - source := ext.NewChanSource(tickerChan(time.Second * 1)) - mapFlow := flow.NewMap(addUTC, 1) + source := ext.NewChanSource(tickerChan(time.Second)) + mapFlow := flow.NewMap(quote, 1) sink := ext.NewStdoutSink() source. @@ -27,19 +27,18 @@ func main() { To(sink) } -var addUTC = func(msg *message) *message { - msg.Msg += "-UTC" +func quote(msg *message) *message { + msg.Msg = fmt.Sprintf("%q", msg.Msg) return msg } -func tickerChan(repeat time.Duration) chan any { - ticker := time.NewTicker(repeat) - oc := ticker.C - nc := make(chan any) +func tickerChan(interval time.Duration) chan any { + outChan := make(chan any) go func() { - for range oc { - nc <- &message{strconv.FormatInt(time.Now().UnixNano(), 10)} + ticker := time.NewTicker(interval) + for t := range ticker.C { + outChan <- &message{Msg: strconv.FormatInt(t.UnixMilli(), 10)} } }() - return nc + return outChan } diff --git a/examples/websocket/main.go b/examples/websocket/main.go index 2218df2..2c3743a 100644 --- a/examples/websocket/main.go +++ b/examples/websocket/main.go @@ -47,7 +47,7 @@ func (server *wsServer) init() { }() log.Print("http server started on :8080") - err := http.ListenAndServe(":8080", nil) + err := http.ListenAndServe(":8080", nil) //nolint:gosec if err != nil { log.Fatalf("Error in http.ListAndServe: %s", err) } diff --git a/extension/chan.go b/extension/chan.go index e5347d6..ec02d4d 100644 --- a/extension/chan.go +++ b/extension/chan.go @@ -18,7 +18,7 @@ func NewChanSource(in chan any) *ChanSource { return &ChanSource{in} } -// Via streams data to a specified operator and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (cs *ChanSource) Via(operator streams.Flow) streams.Flow { flow.DoStream(cs, operator) return operator @@ -46,3 +46,8 @@ func NewChanSink(out chan any) *ChanSink { func (ch *ChanSink) In() chan<- any { return ch.Out } + +// AwaitCompletion is a no-op for the ChanSink. +func (ch *ChanSink) AwaitCompletion() { + // no-op +} diff --git a/extension/fs.go b/extension/fs.go index 9429363..0ff011c 100644 --- a/extension/fs.go +++ b/extension/fs.go @@ -2,12 +2,12 @@ package extension import ( "bufio" + "fmt" "log" "os" "github.com/reugn/go-streams" "github.com/reugn/go-streams/flow" - "github.com/reugn/go-streams/internal/ospkg" ) // FileSource represents an inbound connector that creates a stream of @@ -21,42 +21,43 @@ var _ streams.Source = (*FileSource)(nil) // NewFileSource returns a new FileSource connector. func NewFileSource(fileName string) *FileSource { - source := &FileSource{ + fileSource := &FileSource{ fileName: fileName, in: make(chan any), } - source.init() - return source + + // asynchronously send file data downstream + go fileSource.process() + + return fileSource } -func (fs *FileSource) init() { - go func() { - file, err := os.Open(fs.fileName) - if err != nil { - log.Fatalf("FileSource failed to open the file %s", fs.fileName) - } - defer file.Close() - reader := bufio.NewReader(file) - for { - lineBytes, isPrefix, err := reader.ReadLine() - if err != nil { - close(fs.in) - break - } - - var element string - if isPrefix { - element = string(lineBytes) - } else { - element = string(lineBytes) + ospkg.NewLine - } - - fs.in <- element +func (fs *FileSource) process() { + file, err := os.Open(fs.fileName) + if err != nil { + log.Fatalf("FileSource failed to open the file %s: %v", fs.fileName, err) + } + defer func() { + if err := file.Close(); err != nil { + log.Printf("FileSource failed to close the file %s: %v", fs.fileName, err) } }() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + // send the file line downstream + fs.in <- scanner.Text() + } + + // check for errors that occurred during scanning + if err := scanner.Err(); err != nil { + log.Printf("FileSource scanner error: %v", err) + } + + close(fs.in) } -// Via streams data to a specified operator and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (fs *FileSource) Via(operator streams.Flow) streams.Flow { flow.DoStream(fs, operator) return operator @@ -72,37 +73,65 @@ func (fs *FileSource) Out() <-chan any { type FileSink struct { fileName string in chan any + done chan struct{} } var _ streams.Sink = (*FileSink)(nil) // NewFileSink returns a new FileSink connector. func NewFileSink(fileName string) *FileSink { - sink := &FileSink{ + fileSink := &FileSink{ fileName: fileName, in: make(chan any), + done: make(chan struct{}), } - sink.init() - return sink + + // asynchronously process stream data + go fileSink.process() + + return fileSink } -func (fs *FileSink) init() { - go func() { - file, err := os.OpenFile(fs.fileName, os.O_CREATE|os.O_WRONLY, 0600) - if err != nil { - log.Fatalf("FileSink failed to open the file %s", fs.fileName) - } - defer file.Close() - for element := range fs.in { - _, err = file.WriteString(element.(string)) - if err != nil { - log.Fatalf("FileSink failed to write to the file %s", fs.fileName) - } +func (fs *FileSink) process() { + defer close(fs.done) + + file, err := os.Create(fs.fileName) + if err != nil { + log.Fatalf("FileSink failed to open the file %s: %v", fs.fileName, err) + } + defer func() { + if err := file.Close(); err != nil { + log.Printf("FileSink failed to close the file %s: %v", fs.fileName, err) } }() + + for element := range fs.in { + var stringElement string + switch v := element.(type) { + case string: + stringElement = v + case fmt.Stringer: + stringElement = v.String() + default: + log.Printf("FileSink received an unsupported type %T, discarding", v) + continue + } + + // Write the processed string element to the file. If an error occurs, + // terminate the sink. + if _, err := file.WriteString(stringElement); err != nil { + log.Fatalf("FileSink failed to write to the file %s: %v", fs.fileName, err) + } + } } // In returns the input channel of the FileSink connector. func (fs *FileSink) In() chan<- any { return fs.in } + +// AwaitCompletion blocks until the FileSink has completed processing and +// flushing all data to the file. +func (fs *FileSink) AwaitCompletion() { + <-fs.done +} diff --git a/extension/net.go b/extension/net.go index 5b04c43..b67a494 100644 --- a/extension/net.go +++ b/extension/net.go @@ -3,7 +3,7 @@ package extension import ( "bufio" "context" - "errors" + "fmt" "log" "net" "time" @@ -12,14 +12,13 @@ import ( "github.com/reugn/go-streams/flow" ) -// ConnType represents a connection type. +// ConnType represents a network connection type. type ConnType string const ( - // TCP connection type + // TCP connection type. TCP ConnType = "tcp" - - // UDP connection type + // UDP connection type. UDP ConnType = "udp" ) @@ -36,31 +35,42 @@ var _ streams.Source = (*NetSource)(nil) // NewNetSource returns a new NetSource connector. func NewNetSource(ctx context.Context, connType ConnType, address string) (*NetSource, error) { - var err error - var conn net.Conn - var listener net.Listener - out := make(chan any) + var ( + conn net.Conn + listener net.Listener + ) + out := make(chan any) switch connType { case TCP: - addr, _ := net.ResolveTCPAddr(string(connType), address) + addr, err := net.ResolveTCPAddr(string(connType), address) + if err != nil { + return nil, fmt.Errorf("failed to ResolveTCPAddr: %w", err) + } + listener, err = net.ListenTCP(string(connType), addr) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to ListenTCP: %w", err) } + go acceptConnections(listener, out) case UDP: - addr, _ := net.ResolveUDPAddr(string(connType), address) + addr, err := net.ResolveUDPAddr(string(connType), address) + if err != nil { + return nil, fmt.Errorf("failed to ResolveUDPAddr: %w", err) + } + conn, err = net.ListenUDP(string(connType), addr) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to ListenUDP: %w", err) } + go handleConnection(conn, out) default: - return nil, errors.New("invalid connection type") + return nil, fmt.Errorf("invalid connection type: %s", connType) } - source := &NetSource{ + netSource := &NetSource{ ctx: ctx, conn: conn, listener: listener, @@ -68,19 +78,22 @@ func NewNetSource(ctx context.Context, connType ConnType, address string) (*NetS out: out, } - go source.listenCtx() - return source, nil + // start a goroutine to await the context cancellation and then + // shut down the network source + go netSource.awaitShutdown() + + return netSource, nil } -func (ns *NetSource) listenCtx() { +func (ns *NetSource) awaitShutdown() { <-ns.ctx.Done() if ns.conn != nil { - ns.conn.Close() + _ = ns.conn.Close() } if ns.listener != nil { - ns.listener.Close() + _ = ns.listener.Close() } close(ns.out) @@ -89,7 +102,7 @@ func (ns *NetSource) listenCtx() { // acceptConnections accepts new TCP connections. func acceptConnections(listener net.Listener, out chan<- any) { for { - // accept a new connection + // block and return the next connection to the listener conn, err := listener.Accept() if err != nil { log.Printf("listener.Accept() failed with: %s", err) @@ -101,7 +114,8 @@ func acceptConnections(listener net.Listener, out chan<- any) { } } -// handleConnection handles new connections. +// handleConnection manages a single network connection, reading newline-delimited data +// from it and sending it to the provided output channel. func handleConnection(conn net.Conn, out chan<- any) { log.Printf("NetSource connected on: %v", conn.LocalAddr()) reader := bufio.NewReader(conn) @@ -118,11 +132,13 @@ func handleConnection(conn net.Conn, out chan<- any) { } } - log.Printf("Closing the NetSource connection %v", conn) - conn.Close() + log.Printf("Closing the NetSource connection %v", conn.LocalAddr()) + if err := conn.Close(); err != nil { + log.Printf("Failed to close connection %v", conn.LocalAddr()) + } } -// Via streams data to a specified operator and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (ns *NetSource) Via(operator streams.Flow) streams.Flow { flow.DoStream(ns, operator) return operator @@ -138,52 +154,60 @@ type NetSink struct { conn net.Conn connType ConnType in chan any + done chan struct{} } var _ streams.Sink = (*NetSink)(nil) // NewNetSink returns a new NetSink connector. func NewNetSink(connType ConnType, address string) (*NetSink, error) { - var err error - var conn net.Conn - - conn, err = net.DialTimeout(string(connType), address, time.Second*10) + conn, err := net.DialTimeout(string(connType), address, 10*time.Second) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to connect: %w", err) } + log.Printf("NetSink connected on: %v", conn.LocalAddr()) - sink := &NetSink{ + netSink := &NetSink{ conn: conn, connType: connType, in: make(chan any), + done: make(chan struct{}), } - go sink.init() - return sink, nil + // asynchronously process stream data + go netSink.process() + + return netSink, nil } -// init starts the stream processing loop -func (ns *NetSink) init() { - log.Printf("NetSink connected on: %v", ns.conn.LocalAddr()) - writer := bufio.NewWriter(ns.conn) +func (ns *NetSink) process() { + defer close(ns.done) for msg := range ns.in { - switch m := msg.(type) { + switch message := msg.(type) { case string: - _, err := writer.WriteString(m) - if err == nil { - writer.Flush() + if _, err := ns.conn.Write([]byte(message)); err != nil { + log.Printf("NetSink failed to write to connection %v: %v", + ns.conn.LocalAddr(), err) } default: - log.Printf("NetSink unsupported message type: %T", m) + log.Printf("NetSink unsupported message type: %T", message) } } - log.Printf("Closing the NetSink connection %v", ns.conn) - ns.conn.Close() + log.Printf("Closing the NetSink connection %v", ns.conn.LocalAddr()) + if err := ns.conn.Close(); err != nil { + log.Printf("Failed to close connection %v", ns.conn.LocalAddr()) + } } // In returns the input channel of the NetSink connector. func (ns *NetSink) In() chan<- any { return ns.in } + +// AwaitCompletion blocks until the NetSink has processed all received data, +// closed the connection, and released all resources. +func (ns *NetSink) AwaitCompletion() { + <-ns.done +} diff --git a/extension/std.go b/extension/std.go index c1d07e8..adb6679 100644 --- a/extension/std.go +++ b/extension/std.go @@ -9,27 +9,30 @@ import ( // StdoutSink represents a simple outbound connector that writes // streaming data to standard output. type StdoutSink struct { - in chan any + in chan any + done chan struct{} } var _ streams.Sink = (*StdoutSink)(nil) // NewStdoutSink returns a new StdoutSink connector. func NewStdoutSink() *StdoutSink { - sink := &StdoutSink{ - in: make(chan any), + stdoutSink := &StdoutSink{ + in: make(chan any), + done: make(chan struct{}), } - sink.init() - return sink + // asynchronously process stream data + go stdoutSink.process() + + return stdoutSink } -func (stdout *StdoutSink) init() { - go func() { - for elem := range stdout.in { - fmt.Println(elem) - } - }() +func (stdout *StdoutSink) process() { + defer close(stdout.done) + for elem := range stdout.in { + fmt.Println(elem) + } } // In returns the input channel of the StdoutSink connector. @@ -37,6 +40,11 @@ func (stdout *StdoutSink) In() chan<- any { return stdout.in } +// AwaitCompletion blocks until the StdoutSink has processed all received data. +func (stdout *StdoutSink) AwaitCompletion() { + <-stdout.done +} + // IgnoreSink represents a simple outbound connector that discards // all elements of a stream. type IgnoreSink struct { @@ -47,26 +55,31 @@ var _ streams.Sink = (*IgnoreSink)(nil) // NewIgnoreSink returns a new IgnoreSink connector. func NewIgnoreSink() *IgnoreSink { - sink := &IgnoreSink{ + ignoreSink := &IgnoreSink{ in: make(chan any), } - sink.init() - return sink + // asynchronously process stream data + go ignoreSink.process() + + return ignoreSink } -func (ignore *IgnoreSink) init() { - go func() { - for { - _, ok := <-ignore.in - if !ok { - break - } +func (ignore *IgnoreSink) process() { + for { + _, ok := <-ignore.in + if !ok { + break } - }() + } } // In returns the input channel of the IgnoreSink connector. func (ignore *IgnoreSink) In() chan<- any { return ignore.in } + +// AwaitCompletion is a no-op for the IgnoreSink. +func (ignore *IgnoreSink) AwaitCompletion() { + // no-op +} diff --git a/flow/batch.go b/flow/batch.go index 36fbc2f..5dabe84 100644 --- a/flow/batch.go +++ b/flow/batch.go @@ -46,15 +46,17 @@ func NewBatch[T any](maxBatchSize int, timeInterval time.Duration) *Batch[T] { return batchFlow } -// Via streams data to a specified Flow and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (b *Batch[T]) Via(flow streams.Flow) streams.Flow { go b.transmit(flow) return flow } -// To streams data to a specified Sink. +// To streams data to the given Sink and blocks until the Sink has completed +// processing all data. func (b *Batch[T]) To(sink streams.Sink) { b.transmit(sink) + sink.AwaitCompletion() } // Out returns the output channel of the Batch operator. diff --git a/flow/filter.go b/flow/filter.go index 14cdc5e..2e50abf 100644 --- a/flow/filter.go +++ b/flow/filter.go @@ -49,15 +49,17 @@ func NewFilter[T any](filterPredicate FilterPredicate[T], parallelism int) *Filt return filter } -// Via streams data to a specified Flow and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (f *Filter[T]) Via(flow streams.Flow) streams.Flow { go f.transmit(flow) return flow } -// To streams data to a specified Sink. +// To streams data to the given Sink and blocks until the Sink has completed +// processing all data. func (f *Filter[T]) To(sink streams.Sink) { f.transmit(sink) + sink.AwaitCompletion() } // Out returns the output channel of the Filter operator. diff --git a/flow/flat_map.go b/flow/flat_map.go index 50be4ba..4c5e941 100644 --- a/flow/flat_map.go +++ b/flow/flat_map.go @@ -47,15 +47,17 @@ func NewFlatMap[T, R any](flatMapFunction FlatMapFunction[T, R], parallelism int return flatMap } -// Via streams data to a specified Flow and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (fm *FlatMap[T, R]) Via(flow streams.Flow) streams.Flow { go fm.transmit(flow) return flow } -// To streams data to a specified Sink. +// To streams data to the given Sink and blocks until the Sink has completed +// processing all data. func (fm *FlatMap[T, R]) To(sink streams.Sink) { fm.transmit(sink) + sink.AwaitCompletion() } // Out returns the output channel of the FlatMap operator. diff --git a/flow/keyed.go b/flow/keyed.go index 6d74854..eeff60a 100644 --- a/flow/keyed.go +++ b/flow/keyed.go @@ -70,15 +70,17 @@ func (k *Keyed[K, V]) stream() { close(k.out) } -// Via streams data to a specified Flow and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (k *Keyed[K, V]) Via(flow streams.Flow) streams.Flow { go k.transmit(flow) return flow } -// To streams data to a specified Sink. +// To streams data to the given Sink and blocks until the Sink has completed +// processing all data. func (k *Keyed[K, V]) To(sink streams.Sink) { k.transmit(sink) + sink.AwaitCompletion() } // Out returns the output channel of the Keyed operator. diff --git a/flow/map.go b/flow/map.go index 9e6667e..24d5ac7 100644 --- a/flow/map.go +++ b/flow/map.go @@ -47,15 +47,17 @@ func NewMap[T, R any](mapFunction MapFunction[T, R], parallelism int) *Map[T, R] return mapFlow } -// Via streams data to a specified Flow and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (m *Map[T, R]) Via(flow streams.Flow) streams.Flow { go m.transmit(flow) return flow } -// To streams data to a specified Sink. +// To streams data to the given Sink and blocks until the Sink has completed +// processing all data. func (m *Map[T, R]) To(sink streams.Sink) { m.transmit(sink) + sink.AwaitCompletion() } // Out returns the output channel of the Map operator. diff --git a/flow/pass_through.go b/flow/pass_through.go index 51a8884..580fb09 100644 --- a/flow/pass_through.go +++ b/flow/pass_through.go @@ -28,15 +28,17 @@ func NewPassThrough() *PassThrough { return passThrough } -// Via streams data to a specified Flow and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (pt *PassThrough) Via(flow streams.Flow) streams.Flow { go pt.transmit(flow) return flow } -// To streams data to a specified Sink. +// To streams data to the given Sink and blocks until the Sink has completed +// processing all data. func (pt *PassThrough) To(sink streams.Sink) { pt.transmit(sink) + sink.AwaitCompletion() } // Out returns the output channel of the PassThrough operator. diff --git a/flow/reduce.go b/flow/reduce.go index 276d0e3..ca8e3bf 100644 --- a/flow/reduce.go +++ b/flow/reduce.go @@ -40,15 +40,17 @@ func NewReduce[T any](reduceFunction ReduceFunction[T]) *Reduce[T] { return reduce } -// Via streams data to a specified Flow and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (r *Reduce[T]) Via(flow streams.Flow) streams.Flow { go r.transmit(flow) return flow } -// To streams data to a specified Sink. +// To streams data to the given Sink and blocks until the Sink has completed +// processing all data. func (r *Reduce[T]) To(sink streams.Sink) { r.transmit(sink) + sink.AwaitCompletion() } // Out returns the output channel of the Reduce operator. diff --git a/flow/session_window.go b/flow/session_window.go index a6994c3..7020d0a 100644 --- a/flow/session_window.go +++ b/flow/session_window.go @@ -41,15 +41,17 @@ func NewSessionWindow[T any](inactivityGap time.Duration) *SessionWindow[T] { return sessionWindow } -// Via streams data to a specified Flow and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (sw *SessionWindow[T]) Via(flow streams.Flow) streams.Flow { go sw.transmit(flow) return flow } -// To streams data to a specified Sink. +// To streams data to the given Sink and blocks until the Sink has completed +// processing all data. func (sw *SessionWindow[T]) To(sink streams.Sink) { sw.transmit(sink) + sink.AwaitCompletion() } // Out returns the output channel of the SessionWindow operator. diff --git a/flow/sliding_window.go b/flow/sliding_window.go index 88f10c2..aed2bec 100644 --- a/flow/sliding_window.go +++ b/flow/sliding_window.go @@ -82,17 +82,19 @@ func NewSlidingWindowWithExtractor[T any]( return slidingWindow } -// Via streams data to a specified Flow and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (sw *SlidingWindow[T]) Via(flow streams.Flow) streams.Flow { go sw.emit() go sw.transmit(flow) return flow } -// To streams data to a specified Sink. +// To streams data to the given Sink and blocks until the Sink has completed +// processing all data. func (sw *SlidingWindow[T]) To(sink streams.Sink) { go sw.emit() sw.transmit(sink) + sink.AwaitCompletion() } // Out returns the output channel of the SlidingWindow operator. diff --git a/flow/throttler.go b/flow/throttler.go index 44b63cf..27e9898 100644 --- a/flow/throttler.go +++ b/flow/throttler.go @@ -116,15 +116,17 @@ func (th *Throttler) buffer() { close(th.out) } -// Via streams data to a specified Flow and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (th *Throttler) Via(flow streams.Flow) streams.Flow { go th.streamPortioned(flow) return flow } -// To streams data to a specified Sink. +// To streams data to the given Sink and blocks until the Sink has completed +// processing all data. func (th *Throttler) To(sink streams.Sink) { th.streamPortioned(sink) + sink.AwaitCompletion() } // Out returns the output channel of the Throttler operator. diff --git a/flow/tumbling_window.go b/flow/tumbling_window.go index fc828e6..a74fdaa 100644 --- a/flow/tumbling_window.go +++ b/flow/tumbling_window.go @@ -39,15 +39,17 @@ func NewTumblingWindow[T any](size time.Duration) *TumblingWindow[T] { return tumblingWindow } -// Via streams data to a specified Flow and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (tw *TumblingWindow[T]) Via(flow streams.Flow) streams.Flow { go tw.transmit(flow) return flow } -// To streams data to a specified Sink. +// To streams data to the given Sink and blocks until the Sink has completed +// processing all data. func (tw *TumblingWindow[T]) To(sink streams.Sink) { tw.transmit(sink) + sink.AwaitCompletion() } // Out returns the output channel of the TumblingWindow operator. diff --git a/gcp/storage.go b/gcp/storage.go index 3d2e7c2..c8e2df6 100644 --- a/gcp/storage.go +++ b/gcp/storage.go @@ -33,6 +33,7 @@ type StorageSource struct { client *storage.Client config *StorageSourceConfig out chan any + logger *slog.Logger } @@ -119,7 +120,7 @@ func (s *StorageSource) readObjects(ctx context.Context) { } } -// Via streams data to a specified operator and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (s *StorageSource) Via(operator streams.Flow) streams.Flow { flow.DoStream(s, operator) return operator @@ -154,6 +155,8 @@ type StorageSink struct { client *storage.Client config *StorageSinkConfig in chan any + + done chan struct{} logger *slog.Logger } @@ -180,6 +183,7 @@ func NewStorageSink(ctx context.Context, client *storage.Client, client: client, config: config, in: make(chan any, config.Parallelism), + done: make(chan struct{}), logger: logger, } @@ -192,6 +196,8 @@ func NewStorageSink(ctx context.Context, client *storage.Client, // writeObjects writes incoming stream data elements to GCP Storage using the // configured parallelism. func (s *StorageSink) writeObjects(ctx context.Context) { + defer close(s.done) // signal data processing completion + bucketHandle := s.client.Bucket(s.config.Bucket) var wg sync.WaitGroup for i := 0; i < s.config.Parallelism; i++ { @@ -226,7 +232,13 @@ func (s *StorageSink) writeObjects(ctx context.Context) { // writeObject writes a single object to GCP Storage. func (s *StorageSink) writeObject(ctx context.Context, bucketHandle *storage.BucketHandle, object *StorageObject) error { - defer object.Data.Close() + defer func() { + if err := object.Data.Close(); err != nil { + s.logger.Warn("Failed to close object", + slog.String("key", object.Key), + slog.Any("error", err)) + } + }() // writes will be retried on transient errors from the server writer := bucketHandle.Object(object.Key).NewWriter(ctx) @@ -247,3 +259,9 @@ func (s *StorageSink) writeObject(ctx context.Context, bucketHandle *storage.Buc func (s *StorageSink) In() chan<- any { return s.in } + +// AwaitCompletion blocks until the StorageSink connector has completed +// processing all the received data. +func (s *StorageSink) AwaitCompletion() { + <-s.done +} diff --git a/kafka/kafka_sarama.go b/kafka/kafka_sarama.go index 34fb04b..2f8b136 100644 --- a/kafka/kafka_sarama.go +++ b/kafka/kafka_sarama.go @@ -16,7 +16,8 @@ type SaramaSource struct { handler sarama.ConsumerGroupHandler topics []string out chan any - logger *slog.Logger + + logger *slog.Logger } var _ streams.Source = (*SaramaSource)(nil) @@ -39,19 +40,21 @@ func NewSaramaSource(ctx context.Context, consumerGroup sarama.ConsumerGroup, logger: logger, } - source := &SaramaSource{ + saramaSource := &SaramaSource{ consumer: consumerGroup, handler: handler, topics: topics, out: out, logger: logger, } - go source.init(ctx) - return source + // asynchronously consume messages and send them downstream + go saramaSource.process(ctx) + + return saramaSource } -func (ks *SaramaSource) init(ctx context.Context) { +func (ks *SaramaSource) process(ctx context.Context) { loop: for { handler := ks.handler.(*groupHandler) @@ -69,14 +72,16 @@ loop: default: } } + ks.logger.Info("Closing connector") close(ks.out) + if err := ks.consumer.Close(); err != nil { ks.logger.Warn("Error in consumer.Close", slog.Any("error", err)) } } -// Via streams data to a specified operator and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (ks *SaramaSource) Via(operator streams.Flow) streams.Flow { flow.DoStream(ks, operator) return operator @@ -135,7 +140,9 @@ type SaramaSink struct { producer sarama.SyncProducer topic string in chan any - logger *slog.Logger + + done chan struct{} + logger *slog.Logger } var _ streams.Sink = (*SaramaSink)(nil) @@ -150,18 +157,23 @@ func NewSaramaSink(syncProducer sarama.SyncProducer, topic string, slog.String("name", "kafka.sarama"), slog.String("type", "sink"))) - sink := &SaramaSink{ + saramaSink := &SaramaSink{ producer: syncProducer, topic: topic, in: make(chan any), + done: make(chan struct{}), logger: logger, } - go sink.init() - return sink + // begin processing upstream records + go saramaSink.process() + + return saramaSink } -func (ks *SaramaSink) init() { +func (ks *SaramaSink) process() { + defer close(ks.done) // signal data processing completion + for msg := range ks.in { var err error switch message := msg.(type) { @@ -189,6 +201,7 @@ func (ks *SaramaSink) init() { ks.logger.Error("Error processing message", slog.Any("error", err)) } } + ks.logger.Info("Closing connector") if err := ks.producer.Close(); err != nil { ks.logger.Warn("Error in producer.Close", slog.Any("error", err)) @@ -199,3 +212,9 @@ func (ks *SaramaSink) init() { func (ks *SaramaSink) In() chan<- any { return ks.in } + +// AwaitCompletion blocks until the SaramaSink connector has completed +// processing all the received data. +func (ks *SaramaSink) AwaitCompletion() { + <-ks.done +} diff --git a/nats/nats_jetstream.go b/nats/nats_jetstream.go index dcf883d..356fa11 100644 --- a/nats/nats_jetstream.go +++ b/nats/nats_jetstream.go @@ -71,7 +71,8 @@ type JetStreamSource struct { config *JetStreamSourceConfig subscription *nats.Subscription out chan any - logger *slog.Logger + + logger *slog.Logger } var _ streams.Source = (*JetStreamSource)(nil) @@ -84,7 +85,7 @@ func NewJetStreamSource(ctx context.Context, config *JetStreamSourceConfig, subscription, err := config.JetStreamCtx.PullSubscribe(config.Subject, config.ConsumerName, config.SubOpts...) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to subscribe: %w", err) } if err := config.validate(); err != nil { return nil, err @@ -103,13 +104,14 @@ func NewJetStreamSource(ctx context.Context, config *JetStreamSourceConfig, out: make(chan any), logger: logger, } - go jetStreamSource.init(ctx) + + // asynchronously consume data and send it downstream + go jetStreamSource.process(ctx) return jetStreamSource, nil } -// init starts the stream processing loop. -func (js *JetStreamSource) init(ctx context.Context) { +func (js *JetStreamSource) process(ctx context.Context) { loop: for { select { @@ -150,11 +152,12 @@ loop: js.logger.Error("Failed to drain subscription", slog.Any("error", err)) } + js.logger.Info("Closing connector") close(js.out) } -// Via streams data to a specified operator and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (js *JetStreamSource) Via(operator streams.Flow) streams.Flow { flow.DoStream(js, operator) return operator @@ -198,6 +201,8 @@ func (config *JetStreamSinkConfig) validate() error { type JetStreamSink struct { config *JetStreamSinkConfig in chan any + + done chan struct{} logger *slog.Logger } @@ -221,15 +226,19 @@ func NewJetStreamSink(config *JetStreamSinkConfig, jetStreamSink := &JetStreamSink{ config: config, in: make(chan any), + done: make(chan struct{}), logger: logger, } - go jetStreamSink.init() + + // begin processing upstream data + go jetStreamSink.process() return jetStreamSink, nil } -// init starts the stream processing loop. -func (js *JetStreamSink) init() { +func (js *JetStreamSink) process() { + defer close(js.done) // signal data processing completion + for msg := range js.in { var err error switch message := msg.(type) { @@ -261,6 +270,7 @@ func (js *JetStreamSink) init() { slog.Any("error", err)) } } + js.logger.Info("Closing connector") } @@ -268,3 +278,9 @@ func (js *JetStreamSink) init() { func (js *JetStreamSink) In() chan<- any { return js.in } + +// AwaitCompletion blocks until the JetStreamSink connector has completed +// processing all the received data. +func (js *JetStreamSink) AwaitCompletion() { + <-js.done +} diff --git a/nats/nats_streaming.go b/nats/nats_streaming.go index 7d2b1ca..edb35cb 100644 --- a/nats/nats_streaming.go +++ b/nats/nats_streaming.go @@ -18,7 +18,8 @@ type StreamingSource struct { subscriptionType stan.SubscriptionOption topics []string out chan any - logger *slog.Logger + + logger *slog.Logger } var _ streams.Source = (*StreamingSource)(nil) @@ -42,15 +43,17 @@ func NewStreamingSource(ctx context.Context, conn stan.Conn, out: make(chan any), logger: logger, } - go streamingSource.init(ctx) + + // asynchronously consume data and send it downstream + go streamingSource.process(ctx) return streamingSource } -func (ns *StreamingSource) init(ctx context.Context) { +func (ns *StreamingSource) process(ctx context.Context) { // bind all topic subscribers for _, topic := range ns.topics { - sub, err := ns.conn.Subscribe(topic, func(msg *stan.Msg) { + subscription, err := ns.conn.Subscribe(topic, func(msg *stan.Msg) { ns.out <- msg }, ns.subscriptionType) if err != nil { @@ -59,9 +62,9 @@ func (ns *StreamingSource) init(ctx context.Context) { slog.Any("error", err)) continue } - ns.logger.Info("Subscribed to topic", - slog.String("topic", topic)) - ns.subscriptions = append(ns.subscriptions, sub) + + ns.logger.Info("Subscribed to topic", slog.String("topic", topic)) + ns.subscriptions = append(ns.subscriptions, subscription) } <-ctx.Done() @@ -69,6 +72,7 @@ func (ns *StreamingSource) init(ctx context.Context) { ns.logger.Info("Closing connector") close(ns.out) ns.unsubscribe() // unbind all topic subscriptions + if err := ns.conn.Close(); err != nil { ns.logger.Warn("Error in conn.Close", slog.Any("error", err)) } @@ -83,7 +87,7 @@ func (ns *StreamingSource) unsubscribe() { } } -// Via streams data to a specified operator and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (ns *StreamingSource) Via(operator streams.Flow) streams.Flow { flow.DoStream(ns, operator) return operator @@ -97,9 +101,11 @@ func (ns *StreamingSource) Out() <-chan any { // StreamingSink represents a NATS Streaming sink connector. // Deprecated: Use [JetStreamSink] instead. type StreamingSink struct { - conn stan.Conn - topic string - in chan any + conn stan.Conn + topic string + in chan any + + done chan struct{} logger *slog.Logger } @@ -119,14 +125,19 @@ func NewStreamingSink(conn stan.Conn, topic string, conn: conn, topic: topic, in: make(chan any), + done: make(chan struct{}), logger: logger, } - go streamingSink.init() + + // begin processing upstream data + go streamingSink.process() return streamingSink } -func (ns *StreamingSink) init() { +func (ns *StreamingSink) process() { + defer close(ns.done) // signal data processing completion + for msg := range ns.in { var err error switch message := msg.(type) { @@ -144,6 +155,7 @@ func (ns *StreamingSink) init() { slog.Any("error", err)) } } + ns.logger.Info("Closing connector") if err := ns.conn.Close(); err != nil { ns.logger.Warn("Error in conn.Close", slog.Any("error", err)) @@ -154,3 +166,9 @@ func (ns *StreamingSink) init() { func (ns *StreamingSink) In() chan<- any { return ns.in } + +// AwaitCompletion blocks until the StreamingSink connector has completed +// processing all the received data. +func (ns *StreamingSink) AwaitCompletion() { + <-ns.done +} diff --git a/pulsar/pulsar.go b/pulsar/pulsar.go index 9f28aee..8eb4af0 100644 --- a/pulsar/pulsar.go +++ b/pulsar/pulsar.go @@ -15,7 +15,8 @@ type Source struct { client pulsar.Client consumer pulsar.Consumer out chan any - logger *slog.Logger + + logger *slog.Logger } var _ streams.Source = (*Source)(nil) @@ -25,12 +26,12 @@ func NewSource(ctx context.Context, clientOptions *pulsar.ClientOptions, consumerOptions *pulsar.ConsumerOptions, logger *slog.Logger) (*Source, error) { client, err := pulsar.NewClient(*clientOptions) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create client: %w", err) } consumer, err := client.Subscribe(*consumerOptions) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to subscribe: %w", err) } if logger == nil { @@ -46,12 +47,14 @@ func NewSource(ctx context.Context, clientOptions *pulsar.ClientOptions, out: make(chan any), logger: logger, } - go source.init(ctx) + + // asynchronously consume data and send it downstream + go source.process(ctx) return source, nil } -func (ps *Source) init(ctx context.Context) { +func (ps *Source) process(ctx context.Context) { loop: for { select { @@ -68,19 +71,20 @@ loop: ps.out <- msg } } + ps.logger.Info("Closing connector") close(ps.out) ps.consumer.Close() ps.client.Close() } -// Via streams data to a specified operator and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (ps *Source) Via(operator streams.Flow) streams.Flow { flow.DoStream(ps, operator) return operator } -// Out returns the output channel of the PulsarSource connector. +// Out returns the output channel of the Source connector. func (ps *Source) Out() <-chan any { return ps.out } @@ -90,7 +94,9 @@ type Sink struct { client pulsar.Client producer pulsar.Producer in chan any - logger *slog.Logger + + done chan struct{} + logger *slog.Logger } var _ streams.Sink = (*Sink)(nil) @@ -100,12 +106,12 @@ func NewSink(ctx context.Context, clientOptions *pulsar.ClientOptions, producerOptions *pulsar.ProducerOptions, logger *slog.Logger) (*Sink, error) { client, err := pulsar.NewClient(*clientOptions) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create client: %w", err) } producer, err := client.CreateProducer(*producerOptions) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create producer: %w", err) } if logger == nil { @@ -119,14 +125,19 @@ func NewSink(ctx context.Context, clientOptions *pulsar.ClientOptions, client: client, producer: producer, in: make(chan any), + done: make(chan struct{}), logger: logger, } - go sink.init(ctx) + + // begin processing upstream data + go sink.process(ctx) return sink, nil } -func (ps *Sink) init(ctx context.Context) { +func (ps *Sink) process(ctx context.Context) { + defer close(ps.done) // signal data processing completion + for msg := range ps.in { var err error switch message := msg.(type) { @@ -147,12 +158,19 @@ func (ps *Sink) init(ctx context.Context) { ps.logger.Error("Error processing message", slog.Any("error", err)) } } + ps.logger.Info("Closing connector") ps.producer.Close() ps.client.Close() } -// In returns the input channel of the PulsarSink connector. +// In returns the input channel of the Sink connector. func (ps *Sink) In() chan<- any { return ps.in } + +// AwaitCompletion blocks until the Sink connector has completed +// processing all the received data. +func (ps *Sink) AwaitCompletion() { + <-ps.done +} diff --git a/redis/redis_pubsub.go b/redis/redis_pubsub.go index dbd1d37..aef4d10 100644 --- a/redis/redis_pubsub.go +++ b/redis/redis_pubsub.go @@ -20,7 +20,8 @@ type PubSubSource struct { redisClient *redis.Client channel string out chan any - logger *slog.Logger + + logger *slog.Logger } var _ streams.Source = (*PubSubSource)(nil) @@ -37,9 +38,8 @@ func NewPubSubSource(ctx context.Context, redisClient *redis.Client, // wait for a confirmation that subscription is created before // publishing anything - _, err := pubSub.Receive(ctx) - if err != nil { - return nil, err + if _, err := pubSub.Receive(ctx); err != nil { + return nil, fmt.Errorf("failed to receive: %w", err) } if logger == nil { @@ -55,30 +55,34 @@ func NewPubSubSource(ctx context.Context, redisClient *redis.Client, out: make(chan any), logger: logger, } - go source.init(ctx, pubSub.Channel()) + + // asynchronously consume data and send it downstream + go source.process(ctx, pubSub.Channel()) return source, nil } -func (ps *PubSubSource) init(ctx context.Context, ch <-chan *redis.Message) { +func (ps *PubSubSource) process(ctx context.Context, ch <-chan *redis.Message) { loop: for { select { case <-ctx.Done(): break loop // route incoming messages downstream - case msg := <-ch: - ps.out <- msg + case message := <-ch: + ps.out <- message } } + ps.logger.Info("Closing connector") close(ps.out) + if err := ps.redisClient.Close(); err != nil { ps.logger.Warn("Error in client.Close", slog.Any("error", err)) } } -// Via streams data to a specified operator and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (ps *PubSubSource) Via(operator streams.Flow) streams.Flow { flow.DoStream(ps, operator) return operator @@ -94,7 +98,9 @@ type PubSubSink struct { redisClient *redis.Client channel string in chan any - logger *slog.Logger + + done chan struct{} + logger *slog.Logger } var _ streams.Sink = (*PubSubSink)(nil) @@ -116,14 +122,19 @@ func NewPubSubSink(ctx context.Context, redisClient *redis.Client, redisClient: redisClient, channel: channel, in: make(chan any), + done: make(chan struct{}), logger: logger, } - go sink.init(ctx) + + // begin processing upstream data + go sink.process(ctx) return sink } -func (ps *PubSubSink) init(ctx context.Context) { +func (ps *PubSubSink) process(ctx context.Context) { + defer close(ps.done) // signal data processing completion + for msg := range ps.in { switch message := msg.(type) { case string: @@ -135,6 +146,7 @@ func (ps *PubSubSink) init(ctx context.Context) { slog.String("type", fmt.Sprintf("%T", message))) } } + ps.logger.Info("Closing connector") if err := ps.redisClient.Close(); err != nil { ps.logger.Warn("Error in client.Close", slog.Any("error", err)) @@ -145,3 +157,9 @@ func (ps *PubSubSink) init(ctx context.Context) { func (ps *PubSubSink) In() chan<- any { return ps.in } + +// AwaitCompletion blocks until the PubSubSink connector has completed +// processing all the received data. +func (ps *PubSubSink) AwaitCompletion() { + <-ps.done +} diff --git a/redis/redis_stream.go b/redis/redis_stream.go index 26296a6..571bd91 100644 --- a/redis/redis_stream.go +++ b/redis/redis_stream.go @@ -22,7 +22,8 @@ type StreamSource struct { readGroupArgs *redis.XReadGroupArgs groupCreateArgs *XGroupCreateArgs out chan any - logger *slog.Logger + + logger *slog.Logger } var _ streams.Source = (*StreamSource)(nil) @@ -65,7 +66,7 @@ func NewStreamSource(ctx context.Context, redisClient *redis.Client, groupCreateArgs.StartID).Err() } if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create consumer group: %w", err) } } @@ -83,12 +84,14 @@ func NewStreamSource(ctx context.Context, redisClient *redis.Client, out: make(chan any), logger: logger, } - go source.init(ctx) + + // asynchronously consume data and send it downstream + go source.process(ctx) return source, nil } -func (rs *StreamSource) init(ctx context.Context) { +func (rs *StreamSource) process(ctx context.Context) { loop: for { select { @@ -99,20 +102,20 @@ loop: // support for consumer groups. entries, err := rs.redisClient.XReadGroup(ctx, rs.readGroupArgs).Result() if err != nil { - rs.logger.Error("Error in client.XReadGroup", - slog.Any("error", err)) + rs.logger.Error("Error in client.XReadGroup", slog.Any("error", err)) if strings.HasPrefix(err.Error(), "NOGROUP") { break loop } } // route incoming messages downstream for _, stream := range entries { - for _, msg := range stream.Messages { - rs.out <- &msg + for _, message := range stream.Messages { + rs.out <- &message } } } } + rs.logger.Info("Closing connector") close(rs.out) if err := rs.redisClient.Close(); err != nil { @@ -120,7 +123,7 @@ loop: } } -// Via streams data to a specified operator and returns it. +// Via asynchronously streams data to the given Flow and returns it. func (rs *StreamSource) Via(operator streams.Flow) streams.Flow { flow.DoStream(rs, operator) return operator @@ -136,7 +139,9 @@ type StreamSink struct { redisClient *redis.Client stream string in chan any - logger *slog.Logger + + done chan struct{} + logger *slog.Logger } var _ streams.Sink = (*StreamSink)(nil) @@ -158,14 +163,19 @@ func NewStreamSink(ctx context.Context, redisClient *redis.Client, redisClient: redisClient, stream: stream, in: make(chan any), + done: make(chan struct{}), logger: logger, } - go sink.init(ctx) + + // begin processing upstream data + go sink.process(ctx) return sink } -func (rs *StreamSink) init(ctx context.Context) { +func (rs *StreamSink) process(ctx context.Context) { + defer close(rs.done) // signal data processing completion + for msg := range rs.in { switch message := msg.(type) { case *redis.XMessage: @@ -183,6 +193,7 @@ func (rs *StreamSink) init(ctx context.Context) { slog.String("type", fmt.Sprintf("%T", message))) } } + rs.logger.Info("Closing connector") if err := rs.redisClient.Close(); err != nil { rs.logger.Warn("Error in client.Close", slog.Any("error", err)) @@ -202,3 +213,9 @@ func (rs *StreamSink) xAdd(ctx context.Context, args *redis.XAddArgs) { func (rs *StreamSink) In() chan<- any { return rs.in } + +// AwaitCompletion blocks until the StreamSink connector has completed +// processing all the received data. +func (rs *StreamSink) AwaitCompletion() { + <-rs.done +} diff --git a/streams.go b/streams.go index 42c2d55..7f45c59 100644 --- a/streams.go +++ b/streams.go @@ -2,11 +2,19 @@ package streams // Inlet represents a type that exposes one open input. type Inlet interface { + // In returns the input channel for the Inlet. + // Data sent to this channel will be consumed by the component that implements + // this interface. This channel should be closed by the upstream component + // when no more input is expected. In() chan<- any } // Outlet represents a type that exposes one open output. type Outlet interface { + // Out returns the output channel for the Outlet. + // Data sent to this channel can be consumed by another component further + // in the processing pipeline. This channel should be closed by the implementing + // component when upstream processing has been completed. Out() <-chan any } @@ -16,6 +24,8 @@ type Outlet interface { // Implement this interface to create a custom source connector. type Source interface { Outlet + // Via asynchronously streams data from the Source's Outlet to the given Flow. + // It should return a new Flow that represents the combined pipeline. Via(Flow) Flow } @@ -26,7 +36,12 @@ type Source interface { type Flow interface { Inlet Outlet + // Via asynchronously streams data from the Flow's Outlet to the given Flow. + // It should return a new Flow that represents the combined pipeline. Via(Flow) Flow + // To streams data from the Flow's Outlet to the given Sink, and should block + // until the Sink has completed processing all data, which can be verified + // via the Sink's AwaitCompletion method. To(Sink) } @@ -36,4 +51,10 @@ type Flow interface { // Implement this interface to create a custom sink connector. type Sink interface { Inlet + // AwaitCompletion should block until the Sink has completed processing + // all data received through its Inlet and has finished any necessary + // finalization or cleanup tasks. + // This method is intended for internal use by the pipeline when the + // input stream is closed by the upstream. + AwaitCompletion() } diff --git a/websocket/web_socket.go b/websocket/web_socket.go index 35f8abb..911b4d0 100644 --- a/websocket/web_socket.go +++ b/websocket/web_socket.go @@ -23,7 +23,8 @@ type Message struct { type Source struct { connection *ws.Conn out chan any - logger *slog.Logger + + logger *slog.Logger } var _ streams.Source = (*Source)(nil) @@ -33,13 +34,13 @@ func NewSource(ctx context.Context, url string, logger *slog.Logger) (*Source, e return NewSourceWithDialer(ctx, url, ws.DefaultDialer, logger) } -// NewSourceWithDialer returns a new [Source] using the specified dialer. +// NewSourceWithDialer returns a new [Source] using the provided dialer. func NewSourceWithDialer(ctx context.Context, url string, dialer *ws.Dialer, logger *slog.Logger) (*Source, error) { // create a new client connection conn, _, err := dialer.Dial(url, nil) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to dial: %w", err) } if logger == nil { @@ -54,57 +55,63 @@ func NewSourceWithDialer(ctx context.Context, url string, out: make(chan any), logger: logger, } - go source.init(ctx) + + // asynchronously consume data and send it downstream + go source.process(ctx) return source, nil } -func (wsock *Source) init(ctx context.Context) { +func (s *Source) process(ctx context.Context) { loop: for { select { case <-ctx.Done(): break loop default: - messageType, payload, err := wsock.connection.ReadMessage() + messageType, payload, err := s.connection.ReadMessage() if err != nil { - wsock.logger.Error("Error in connection.ReadMessage", + s.logger.Error("Error in connection.ReadMessage", slog.Any("error", err)) } else { // exit loop on CloseMessage if messageType == ws.CloseMessage { break loop } - wsock.out <- Message{ + s.out <- Message{ MsgType: messageType, Payload: payload, } } } } - wsock.logger.Info("Closing connector") - close(wsock.out) - if err := wsock.connection.Close(); err != nil { - wsock.logger.Warn("Error in connection.Close", slog.Any("error", err)) + + s.logger.Info("Closing connector") + close(s.out) + + if err := s.connection.Close(); err != nil { + s.logger.Warn("Error in connection.Close", slog.Any("error", err)) } } -// Via streams data to a specified operator and returns it. -func (wsock *Source) Via(operator streams.Flow) streams.Flow { - flow.DoStream(wsock, operator) +// Via asynchronously streams data to the given Flow and returns it. +func (s *Source) Via(operator streams.Flow) streams.Flow { + flow.DoStream(s, operator) return operator } // Out returns the output channel of the Source connector. -func (wsock *Source) Out() <-chan any { - return wsock.out +func (s *Source) Out() <-chan any { + return s.out } // Sink represents a WebSocket sink connector. type Sink struct { connection *ws.Conn in chan any - logger *slog.Logger + + done chan struct{} + logger *slog.Logger } var _ streams.Sink = (*Sink)(nil) @@ -114,12 +121,12 @@ func NewSink(url string, logger *slog.Logger) (*Sink, error) { return NewSinkWithDialer(url, ws.DefaultDialer, logger) } -// NewSinkWithDialer returns a new [Sink] using the specified dialer. +// NewSinkWithDialer returns a new [Sink] using the provided dialer. func NewSinkWithDialer(url string, dialer *ws.Dialer, logger *slog.Logger) (*Sink, error) { // create a new client connection conn, _, err := dialer.Dial(url, nil) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to dial: %w", err) } if logger == nil { @@ -132,42 +139,53 @@ func NewSinkWithDialer(url string, dialer *ws.Dialer, logger *slog.Logger) (*Sin sink := &Sink{ connection: conn, in: make(chan any), + done: make(chan struct{}), logger: logger, } - go sink.init() + + // begin processing upstream data + go sink.process() return sink, nil } -func (wsock *Sink) init() { - for msg := range wsock.in { +func (s *Sink) process() { + defer close(s.done) // signal data processing completion + + for msg := range s.in { var err error - switch m := msg.(type) { + switch message := msg.(type) { case Message: - err = wsock.connection.WriteMessage(m.MsgType, m.Payload) + err = s.connection.WriteMessage(message.MsgType, message.Payload) case *Message: - err = wsock.connection.WriteMessage(m.MsgType, m.Payload) + err = s.connection.WriteMessage(message.MsgType, message.Payload) case string: - err = wsock.connection.WriteMessage(ws.TextMessage, []byte(m)) + err = s.connection.WriteMessage(ws.TextMessage, []byte(message)) case []byte: - err = wsock.connection.WriteMessage(ws.BinaryMessage, m) + err = s.connection.WriteMessage(ws.BinaryMessage, message) default: - wsock.logger.Error("Unsupported message type", - slog.String("type", fmt.Sprintf("%T", m))) + s.logger.Error("Unsupported message type", + slog.String("type", fmt.Sprintf("%T", message))) } if err != nil { - wsock.logger.Error("Error processing message", - slog.Any("error", err)) + s.logger.Error("Error processing message", slog.Any("error", err)) } } - wsock.logger.Info("Closing connector") - if err := wsock.connection.Close(); err != nil { - wsock.logger.Warn("Error in connection.Close", slog.Any("error", err)) + + s.logger.Info("Closing connector") + if err := s.connection.Close(); err != nil { + s.logger.Warn("Error in connection.Close", slog.Any("error", err)) } } // In returns the input channel of the Sink connector. -func (wsock *Sink) In() chan<- any { - return wsock.in +func (s *Sink) In() chan<- any { + return s.in +} + +// AwaitCompletion blocks until the Sink connector has completed +// processing all the received data. +func (s *Sink) AwaitCompletion() { + <-s.done }