Skip to content

Commit

Permalink
feat: get event streaming working
Browse files Browse the repository at this point in the history
  • Loading branch information
w-h-a committed Jul 2, 2024
1 parent f38cf0c commit 99c0876
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 0 deletions.
171 changes: 171 additions & 0 deletions streams/grpcstreams/grpc_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package grpcstreams

import (
"encoding/json"
"fmt"
"sync"
"time"

"github.com/google/uuid"
"github.com/w-h-a/pkg/store"
"github.com/w-h-a/pkg/streams"
"github.com/w-h-a/pkg/telemetry/log"
)

type grpcStreams struct {
options streams.StreamsOptions
store store.Store
subscribers map[string]streams.Subscriber
mtx sync.RWMutex
}

func (s *grpcStreams) Options() streams.StreamsOptions {
return s.options
}

func (s *grpcStreams) Subscribe(id string, opts ...streams.SubscribeOption) error {
sub := NewSubscriber(opts...)

s.mtx.Lock()
s.subscribers[id] = sub
s.mtx.Unlock()

return nil
}

func (s *grpcStreams) Unsubscribe(id string) error {
s.mtx.Lock()
defer s.mtx.Unlock()

delete(s.subscribers, id)

return nil
}

func (s *grpcStreams) Consume(id string, opts ...streams.ConsumeOption) (<-chan streams.Event, error) {
options := streams.NewConsumeOptions(opts...)

s.mtx.RLock()
sub, ok := s.subscribers[id]
if !ok {
s.mtx.RUnlock()
return nil, streams.ErrSubscriberNotFound
}
s.mtx.RUnlock()

if options.Offset.Unix() > 0 {
go s.lookupPreviousEvents(sub, options.Offset)
}

return sub.Channel(), nil
}

func (s *grpcStreams) Produce(topic string, data interface{}, opts ...streams.ProduceOption) error {
options := streams.NewProduceOptions(opts...)

var payload []byte

if p, ok := data.([]byte); ok {
payload = p
} else {
p, err := json.Marshal(data)
if err != nil {
return streams.ErrEncodingData
}
payload = p
}

event := &streams.Event{
Id: uuid.New().String(),
Topic: topic,
Payload: payload,
Timestamp: options.Timestamp,
Metadata: options.Metadata,
}

bytes, err := json.Marshal(event)
if err != nil {
return streams.ErrEncodingEvent
}

key := fmt.Sprintf("%v:%v", event.Topic, event.Id)

if err := s.store.Write(&store.Record{
Key: key,
Value: bytes,
}); err != nil {
return fmt.Errorf("failed to write to event store: %v", err)
}

go s.handleEvent(event)

return nil
}

func (s *grpcStreams) String() string {
return "grpc"
}

func (s *grpcStreams) lookupPreviousEvents(sub streams.Subscriber, startTime time.Time) {
recs, err := s.store.Read(sub.Options().Topic+":", store.ReadWithPrefix())
if err != nil {
log.Errorf("failed to find any previous events: %v", err)
return
}

for _, rec := range recs {
var event streams.Event

if err := json.Unmarshal(rec.Value, &event); err != nil {
continue
}

if event.Timestamp.Unix() < startTime.Unix() {
continue
}

if err := SendEvent(sub, &event); err != nil {
log.Errorf("failed to send previous event: %v", err)
continue
}
}
}

func (s *grpcStreams) handleEvent(ev *streams.Event) {
s.mtx.RLock()
subs := s.subscribers
s.mtx.RUnlock()

groupedSubscribers := map[string]streams.Subscriber{}

for _, sub := range subs {
if len(sub.Options().Topic) == 0 || sub.Options().Topic == ev.Topic {
groupedSubscribers[sub.Options().Group] = sub
}
}

for _, sub := range groupedSubscribers {
go func(sub streams.Subscriber) {
if err := SendEvent(sub, ev); err != nil {
log.Errorf("failed to handle event: %v", err)
}
}(sub)
}
}

func NewStreams(opts ...streams.StreamsOption) streams.Streams {
options := streams.NewStreamsOptions(opts...)

g := &grpcStreams{
options: options,
subscribers: map[string]streams.Subscriber{},
mtx: sync.RWMutex{},
}

s, ok := GetStoreFromContext(options.Context)
if ok {
g.store = s
}

return g
}
1 change: 1 addition & 0 deletions streams/grpcstreams/grpc_streams_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package grpcstreams
67 changes: 67 additions & 0 deletions streams/grpcstreams/grpc_subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package grpcstreams

import (
"sync"

"github.com/w-h-a/pkg/streams"
)

type grpcSubscriber struct {
options streams.SubscribeOptions
channel chan streams.Event
retryMap map[string]int
mtx sync.RWMutex
}

func (s *grpcSubscriber) Options() streams.SubscribeOptions {
return s.options
}

func (s *grpcSubscriber) Channel() chan streams.Event {
return s.channel
}

func (s *grpcSubscriber) Ack(ev streams.Event) error {
s.mtx.Lock()
defer s.mtx.Unlock()

delete(s.retryMap, ev.Id)

return nil
}

func (s *grpcSubscriber) Nack(ev streams.Event) error {
return nil
}

func (s *grpcSubscriber) SetAttemptCount(c int, ev streams.Event) {
s.mtx.Lock()
defer s.mtx.Unlock()

s.retryMap[ev.Id] = c
}

func (s *grpcSubscriber) GetAttemptCount(ev streams.Event) (int, bool) {
s.mtx.RLock()
defer s.mtx.RUnlock()

count, ok := s.retryMap[ev.Id]
return count, ok
}

func (s *grpcSubscriber) String() string {
return "grpc"
}

func NewSubscriber(opts ...streams.SubscribeOption) streams.Subscriber {
options := streams.NewSubscribeOptions(opts...)

s := &grpcSubscriber{
options: options,
channel: make(chan streams.Event),
retryMap: map[string]int{},
mtx: sync.RWMutex{},
}

return s
}
21 changes: 21 additions & 0 deletions streams/grpcstreams/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package grpcstreams

import (
"context"

"github.com/w-h-a/pkg/security/token"
"github.com/w-h-a/pkg/store"
)

type storeKey struct{}

func GrpcStreamsWithStore(s store.Store) token.TokenOption {
return func(o *token.TokenOptions) {
o.Context = context.WithValue(o.Context, storeKey{}, s)
}
}

func GetStoreFromContext(ctx context.Context) (store.Store, bool) {
s, ok := ctx.Value(storeKey{}).(store.Store)
return s, ok
}
55 changes: 55 additions & 0 deletions streams/grpcstreams/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package grpcstreams

import (
"fmt"
"time"

"github.com/w-h-a/pkg/streams"
)

func SendEvent(sub streams.Subscriber, event *streams.Event) error {
cpy := *event

if sub.Options().AutoAck {
sub.Channel() <- cpy
return nil
}

cpy.SetAck(Ack(sub, cpy))
cpy.SetNack(Nack(sub, cpy))

sub.SetAttemptCount(0, cpy)

tick := time.NewTicker(sub.Options().AckWait)
defer tick.Stop()

for range tick.C {
count, ok := sub.GetAttemptCount(cpy)
if !ok {
break
}

if sub.Options().RetryLimit > -1 && count > sub.Options().RetryLimit {
sub.Ack(cpy)
return fmt.Errorf("discarding event %s because the number of attempts %d exceeded the retry limit %d", cpy.Id, count, sub.Options().RetryLimit)
}

sub.Channel() <- cpy

sub.SetAttemptCount(count+1, cpy)
}

return nil
}

func Ack(sub streams.Subscriber, event streams.Event) func() error {
return func() error {
return sub.Ack(event)
}
}

func Nack(sub streams.Subscriber, event streams.Event) func() error {
return func() error {
return sub.Nack(event)
}
}
11 changes: 11 additions & 0 deletions streams/subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package streams

type Subscriber interface {
Options() SubscribeOptions
Channel() chan Event
Ack(ev Event) error
Nack(ev Event) error
SetAttemptCount(c int, ev Event)
GetAttemptCount(ev Event) (int, bool)
String() string
}

0 comments on commit 99c0876

Please sign in to comment.