diff --git a/streams/grpcstreams/grpc_streams.go b/streams/grpcstreams/grpc_streams.go new file mode 100644 index 0000000..af706b7 --- /dev/null +++ b/streams/grpcstreams/grpc_streams.go @@ -0,0 +1,171 @@ +package grpcstreams + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/google/uuid" + "github.com/w-h-a/pkg/store" + "github.com/w-h-a/pkg/streams" + "github.com/w-h-a/pkg/telemetry/log" +) + +type grpcStreams struct { + options streams.StreamsOptions + store store.Store + subscribers map[string]streams.Subscriber + mtx sync.RWMutex +} + +func (s *grpcStreams) Options() streams.StreamsOptions { + return s.options +} + +func (s *grpcStreams) Subscribe(id string, opts ...streams.SubscribeOption) error { + sub := NewSubscriber(opts...) + + s.mtx.Lock() + s.subscribers[id] = sub + s.mtx.Unlock() + + return nil +} + +func (s *grpcStreams) Unsubscribe(id string) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + delete(s.subscribers, id) + + return nil +} + +func (s *grpcStreams) Consume(id string, opts ...streams.ConsumeOption) (<-chan streams.Event, error) { + options := streams.NewConsumeOptions(opts...) + + s.mtx.RLock() + sub, ok := s.subscribers[id] + if !ok { + s.mtx.RUnlock() + return nil, streams.ErrSubscriberNotFound + } + s.mtx.RUnlock() + + if options.Offset.Unix() > 0 { + go s.lookupPreviousEvents(sub, options.Offset) + } + + return sub.Channel(), nil +} + +func (s *grpcStreams) Produce(topic string, data interface{}, opts ...streams.ProduceOption) error { + options := streams.NewProduceOptions(opts...) + + var payload []byte + + if p, ok := data.([]byte); ok { + payload = p + } else { + p, err := json.Marshal(data) + if err != nil { + return streams.ErrEncodingData + } + payload = p + } + + event := &streams.Event{ + Id: uuid.New().String(), + Topic: topic, + Payload: payload, + Timestamp: options.Timestamp, + Metadata: options.Metadata, + } + + bytes, err := json.Marshal(event) + if err != nil { + return streams.ErrEncodingEvent + } + + key := fmt.Sprintf("%v:%v", event.Topic, event.Id) + + if err := s.store.Write(&store.Record{ + Key: key, + Value: bytes, + }); err != nil { + return fmt.Errorf("failed to write to event store: %v", err) + } + + go s.handleEvent(event) + + return nil +} + +func (s *grpcStreams) String() string { + return "grpc" +} + +func (s *grpcStreams) lookupPreviousEvents(sub streams.Subscriber, startTime time.Time) { + recs, err := s.store.Read(sub.Options().Topic+":", store.ReadWithPrefix()) + if err != nil { + log.Errorf("failed to find any previous events: %v", err) + return + } + + for _, rec := range recs { + var event streams.Event + + if err := json.Unmarshal(rec.Value, &event); err != nil { + continue + } + + if event.Timestamp.Unix() < startTime.Unix() { + continue + } + + if err := SendEvent(sub, &event); err != nil { + log.Errorf("failed to send previous event: %v", err) + continue + } + } +} + +func (s *grpcStreams) handleEvent(ev *streams.Event) { + s.mtx.RLock() + subs := s.subscribers + s.mtx.RUnlock() + + groupedSubscribers := map[string]streams.Subscriber{} + + for _, sub := range subs { + if len(sub.Options().Topic) == 0 || sub.Options().Topic == ev.Topic { + groupedSubscribers[sub.Options().Group] = sub + } + } + + for _, sub := range groupedSubscribers { + go func(sub streams.Subscriber) { + if err := SendEvent(sub, ev); err != nil { + log.Errorf("failed to handle event: %v", err) + } + }(sub) + } +} + +func NewStreams(opts ...streams.StreamsOption) streams.Streams { + options := streams.NewStreamsOptions(opts...) + + g := &grpcStreams{ + options: options, + subscribers: map[string]streams.Subscriber{}, + mtx: sync.RWMutex{}, + } + + s, ok := GetStoreFromContext(options.Context) + if ok { + g.store = s + } + + return g +} diff --git a/streams/grpcstreams/grpc_streams_test.go b/streams/grpcstreams/grpc_streams_test.go new file mode 100644 index 0000000..5363cc2 --- /dev/null +++ b/streams/grpcstreams/grpc_streams_test.go @@ -0,0 +1 @@ +package grpcstreams \ No newline at end of file diff --git a/streams/grpcstreams/grpc_subscriber.go b/streams/grpcstreams/grpc_subscriber.go new file mode 100644 index 0000000..112b49a --- /dev/null +++ b/streams/grpcstreams/grpc_subscriber.go @@ -0,0 +1,67 @@ +package grpcstreams + +import ( + "sync" + + "github.com/w-h-a/pkg/streams" +) + +type grpcSubscriber struct { + options streams.SubscribeOptions + channel chan streams.Event + retryMap map[string]int + mtx sync.RWMutex +} + +func (s *grpcSubscriber) Options() streams.SubscribeOptions { + return s.options +} + +func (s *grpcSubscriber) Channel() chan streams.Event { + return s.channel +} + +func (s *grpcSubscriber) Ack(ev streams.Event) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + delete(s.retryMap, ev.Id) + + return nil +} + +func (s *grpcSubscriber) Nack(ev streams.Event) error { + return nil +} + +func (s *grpcSubscriber) SetAttemptCount(c int, ev streams.Event) { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.retryMap[ev.Id] = c +} + +func (s *grpcSubscriber) GetAttemptCount(ev streams.Event) (int, bool) { + s.mtx.RLock() + defer s.mtx.RUnlock() + + count, ok := s.retryMap[ev.Id] + return count, ok +} + +func (s *grpcSubscriber) String() string { + return "grpc" +} + +func NewSubscriber(opts ...streams.SubscribeOption) streams.Subscriber { + options := streams.NewSubscribeOptions(opts...) + + s := &grpcSubscriber{ + options: options, + channel: make(chan streams.Event), + retryMap: map[string]int{}, + mtx: sync.RWMutex{}, + } + + return s +} diff --git a/streams/grpcstreams/options.go b/streams/grpcstreams/options.go new file mode 100644 index 0000000..ad0a026 --- /dev/null +++ b/streams/grpcstreams/options.go @@ -0,0 +1,21 @@ +package grpcstreams + +import ( + "context" + + "github.com/w-h-a/pkg/security/token" + "github.com/w-h-a/pkg/store" +) + +type storeKey struct{} + +func GrpcStreamsWithStore(s store.Store) token.TokenOption { + return func(o *token.TokenOptions) { + o.Context = context.WithValue(o.Context, storeKey{}, s) + } +} + +func GetStoreFromContext(ctx context.Context) (store.Store, bool) { + s, ok := ctx.Value(storeKey{}).(store.Store) + return s, ok +} diff --git a/streams/grpcstreams/utils.go b/streams/grpcstreams/utils.go new file mode 100644 index 0000000..1edc8cb --- /dev/null +++ b/streams/grpcstreams/utils.go @@ -0,0 +1,55 @@ +package grpcstreams + +import ( + "fmt" + "time" + + "github.com/w-h-a/pkg/streams" +) + +func SendEvent(sub streams.Subscriber, event *streams.Event) error { + cpy := *event + + if sub.Options().AutoAck { + sub.Channel() <- cpy + return nil + } + + cpy.SetAck(Ack(sub, cpy)) + cpy.SetNack(Nack(sub, cpy)) + + sub.SetAttemptCount(0, cpy) + + tick := time.NewTicker(sub.Options().AckWait) + defer tick.Stop() + + for range tick.C { + count, ok := sub.GetAttemptCount(cpy) + if !ok { + break + } + + if sub.Options().RetryLimit > -1 && count > sub.Options().RetryLimit { + sub.Ack(cpy) + return fmt.Errorf("discarding event %s because the number of attempts %d exceeded the retry limit %d", cpy.Id, count, sub.Options().RetryLimit) + } + + sub.Channel() <- cpy + + sub.SetAttemptCount(count+1, cpy) + } + + return nil +} + +func Ack(sub streams.Subscriber, event streams.Event) func() error { + return func() error { + return sub.Ack(event) + } +} + +func Nack(sub streams.Subscriber, event streams.Event) func() error { + return func() error { + return sub.Nack(event) + } +} diff --git a/streams/subscriber.go b/streams/subscriber.go new file mode 100644 index 0000000..b2e0a64 --- /dev/null +++ b/streams/subscriber.go @@ -0,0 +1,11 @@ +package streams + +type Subscriber interface { + Options() SubscribeOptions + Channel() chan Event + Ack(ev Event) error + Nack(ev Event) error + SetAttemptCount(c int, ev Event) + GetAttemptCount(ev Event) (int, bool) + String() string +}