Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Redis stream connector #80

Merged
merged 1 commit into from
Aug 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/redis/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
redis
stream/stream
pubsub/pubsub
2 changes: 1 addition & 1 deletion examples/redis/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ version: '3'
services:
redis:
image: redis
container_name: pubsub
container_name: redis-streams
ports:
- 6379:6379
68 changes: 68 additions & 0 deletions examples/redis/stream/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"context"
"fmt"
"log"
"strings"
"time"

rs "github.com/reugn/go-streams/redis"

"github.com/redis/go-redis/v9"
"github.com/reugn/go-streams/flow"
)

// XADD stream1 * key1 a key2 b key3 c
// XLEN stream2
// XREAD COUNT 1 BLOCK 100 STREAMS stream2 0
func main() {
ctx, cancelFunc := context.WithCancel(context.Background())

timer := time.NewTimer(time.Minute * 30)
go func() {
<-timer.C
cancelFunc()
}()

config := &redis.Options{
Addr: "localhost:6379", // use default Addr
Password: "", // no password set
DB: 0, // use default DB
}

redisClient := redis.NewClient(config)

readGroupArgs := &redis.XReadGroupArgs{
Group: "group1",
Consumer: "consumer1",
Streams: []string{"stream1", ">"},
}
// groupCreateArgs := &rs.XGroupCreateArgs{
// Stream: "stream1",
// Group: "group1",
// StartID: "$",
// MkStream: true,
// }
source, err := rs.NewStreamSource(ctx, redisClient, readGroupArgs, nil)
if err != nil {
log.Fatal(err)
}

toUpperMapFlow := flow.NewMap(toUpper, 1)
sink := rs.NewStreamSink(ctx, redisClient, "stream2")

source.
Via(toUpperMapFlow).
To(sink)
}

var toUpper = func(msg *redis.XMessage) *redis.XMessage {
fmt.Printf("Got: %v\n", msg.Values)
values := make(map[string]interface{}, len(msg.Values))
for key, element := range msg.Values {
values[key] = strings.ToUpper(fmt.Sprintf("%v", element))
}
msg.Values = values
return msg
}
2 changes: 1 addition & 1 deletion redis/doc.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
// Package redis implements the Redis Pub/Sub connector.
// Package redis implements the Redis streaming connectors.
package redis
177 changes: 177 additions & 0 deletions redis/redis_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package redis

import (
"context"
"log"

"github.com/redis/go-redis/v9"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
)

// StreamSource represents a Redis stream source connector.
//
// A Redis stream is a data structure that acts like an append-only log but
// also implements several operations to overcome some of the limits of a typical
// append-only log. These include random access in O(1) time and complex
// consumption strategies, such as consumer groups.
type StreamSource struct {
ctx context.Context
redisClient *redis.Client
readGroupArgs *redis.XReadGroupArgs
groupCreateArgs *XGroupCreateArgs
out chan interface{}
}

// XGroupCreateArgs represents the arguments for creating a consumer group.
//
// Use the special StartID "$" to fetch only the new elements arriving in the stream.
// If instead you want the consumer to fetch the whole stream history,
// use zero ("0") as the starting ID for the consumer group.
type XGroupCreateArgs struct {
Stream string
Group string
StartID string
MkStream bool // set to true to create an empty stream automatically
}

// NewStreamSource returns a new StreamSource instance.
// Pass in nil for the groupCreateArgs parameter if the consumer group already exists.
func NewStreamSource(ctx context.Context, redisClient *redis.Client,
readGroupArgs *redis.XReadGroupArgs, groupCreateArgs *XGroupCreateArgs) (*StreamSource, error) {
if groupCreateArgs != nil {
// Create a new consumer group uniquely identified by <group> for the stream stored at <stream>.
// By default, the XGROUP CREATE command expects that the target stream exists,
// and returns an error when it doesn't.
var err error
if groupCreateArgs.MkStream {
err = redisClient.XGroupCreateMkStream(
ctx,
groupCreateArgs.Stream,
groupCreateArgs.Group,
groupCreateArgs.StartID).Err()
} else {
err = redisClient.XGroupCreate(
ctx,
groupCreateArgs.Stream,
groupCreateArgs.Group,
groupCreateArgs.StartID).Err()
}
if err != nil {
return nil, err
}
}

source := &StreamSource{
ctx: ctx,
redisClient: redisClient,
readGroupArgs: readGroupArgs,
groupCreateArgs: groupCreateArgs,
out: make(chan interface{}),
}

go source.init()
return source, nil
}

// init starts the main loop
func (rs *StreamSource) init() {
loop:
for {
select {
case <-rs.ctx.Done():
break loop

default:
// The XREADGROUP command is a special version of the XREAD command with
// support for consumer groups.
entries, err := rs.redisClient.XReadGroup(rs.ctx, rs.readGroupArgs).Result()
if err != nil {
log.Printf("Error in redisClient.XReadGroup: %s", err)
}

for _, e := range entries {
for _, msg := range e.Messages {
rs.out <- &msg
}
}
}
}

log.Printf("Closing Redis stream consumer")
close(rs.out)
rs.redisClient.Close()
}

// Via streams data through the given flow
func (rs *StreamSource) Via(_flow streams.Flow) streams.Flow {
flow.DoStream(rs, _flow)
return _flow
}

// Out returns an output channel for sending data
func (rs *StreamSource) Out() <-chan interface{} {
return rs.out
}

// StreamSink represents a Redis stream sink connector.
type StreamSink struct {
ctx context.Context
redisClient *redis.Client
stream string
in chan interface{}
}

// NewStreamSink returns a new StreamSink instance.
//
// The incoming messages will be streamed to the given target stream using the
// provided redis.Client.
func NewStreamSink(ctx context.Context, redisClient *redis.Client, stream string) *StreamSink {
sink := &StreamSink{
ctx: ctx,
redisClient: redisClient,
stream: stream,
in: make(chan interface{}),
}

go sink.init()
return sink
}

// init starts the main loop
func (rs *StreamSink) init() {
for msg := range rs.in {
switch m := msg.(type) {
case *redis.XMessage:
rs.xAdd(&redis.XAddArgs{
Stream: rs.stream, // use the target stream name
Values: m.Values,
})
case map[string]interface{}:
rs.xAdd(&redis.XAddArgs{
Stream: rs.stream,
Values: m,
})
default:
log.Printf("Unsupported message type %v", m)
}
}

log.Printf("Closing Redis stream producer")
rs.redisClient.Close()
}

// xAdd appends the message to the target stream
func (rs *StreamSink) xAdd(args *redis.XAddArgs) {
// Streams are an append-only data structure. The fundamental write
// command, called XADD, appends a new entry to the specified stream.
err := rs.redisClient.XAdd(rs.ctx, args).Err()
if err != nil {
log.Printf("Error in redisClient.XAdd: %s", err)
}
}

// In returns an input channel for receiving data
func (rs *StreamSink) In() chan<- interface{} {
return rs.in
}