Skip to content

Commit

Permalink
feat: add subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
w-h-a committed Aug 24, 2024
1 parent d0d7a8d commit 1850e33
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 44 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
| Package | Examples | Use Case |
| --------- | -------------------- | -------------------------- |
| api | http | build gateway servers |
| broker | sqs | asynchronous communication |
| broker | sns+sqs | asynchronous communication |
| client | grpc, http | synchronous communication |
| runtime | kubernetes | service info |
| security | jwts, nacl, autocert | tokens, secrets, and certs |
Expand Down
2 changes: 1 addition & 1 deletion broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ var (
type Broker interface {
Options() BrokerOptions
Publish(data interface{}, options PublishOptions) error
Subscribe(callback func([]byte) error, options SubscribeOptions)
Subscribe(callback func([]byte) error, options SubscribeOptions) Subscriber
String() string
}
16 changes: 2 additions & 14 deletions broker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ type BrokerOption func(o *BrokerOptions)

type BrokerOptions struct {
Nodes []string
Topic string
Group string
PublishOptions PublishOptions
SubscribeOptions SubscribeOptions
Context context.Context
Expand All @@ -19,18 +17,6 @@ func BrokerWithNodes(addrs ...string) BrokerOption {
}
}

func BrokerWithTopic(topic string) BrokerOption {
return func(o *BrokerOptions) {
o.Topic = topic
}
}

func BrokerWithGroup(group string) BrokerOption {
return func(o *BrokerOptions) {
o.Group = group
}
}

func BrokerWithPublishOptions(options PublishOptions) BrokerOption {
return func(o *BrokerOptions) {
o.PublishOptions = options
Expand Down Expand Up @@ -64,11 +50,13 @@ func NewBrokerOptions(opts ...BrokerOption) BrokerOptions {
type PublishOption func(o *PublishOptions)

type PublishOptions struct {
Topic string
Context context.Context
}

type SubscribeOption func(o *SubscribeOptions)

type SubscribeOptions struct {
Group string
Context context.Context
}
14 changes: 7 additions & 7 deletions broker/snssqs/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (c *snsClient) ProduceToTopic(bs []byte, topic string) error {
}

type SqsClient interface {
ConsumeFromGroup(callback func([]byte) error, group string, options broker.SubscribeOptions)
ConsumeFromGroup(sub broker.Subscriber)
}

type sqsClient struct {
Expand All @@ -41,16 +41,16 @@ type sqsClient struct {
waitTimeSeconds int32
}

func (c *sqsClient) ConsumeFromGroup(callback func([]byte) error, group string, options broker.SubscribeOptions) {
func (c *sqsClient) ConsumeFromGroup(sub broker.Subscriber) {
result, err := c.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
QueueUrl: aws.String(group),
QueueUrl: aws.String(sub.Options().Group),
MaxNumberOfMessages: 1,
VisibilityTimeout: c.visibilityTimeout,
WaitTimeSeconds: c.waitTimeSeconds,
MessageAttributeNames: []string{"All"},
})
if err != nil {
log.Errorf("failed to receive sqs message from group %s: %s", group, err.Error())
log.Errorf("failed to receive sqs message from group %s: %s", sub.Options().Group, err.Error())
return
}

Expand All @@ -60,12 +60,12 @@ func (c *sqsClient) ConsumeFromGroup(callback func([]byte) error, group string,

for _, msg := range result.Messages {
body := msg.Body
if err := callback([]byte(*body)); err != nil {
log.Errorf("failed to handle message from group %s: %s", group, err)
if err := sub.Handler([]byte(*body)); err != nil {
log.Errorf("failed to handle message from group %s: %s", sub.Options().Group, err)
} else {
msgHandle := msg.ReceiptHandle
c.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: aws.String(group),
QueueUrl: aws.String(sub.Options().Group),
ReceiptHandle: msgHandle,
})
}
Expand Down
27 changes: 22 additions & 5 deletions broker/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/google/uuid"
"github.com/w-h-a/pkg/broker"
"github.com/w-h-a/pkg/telemetry/log"
)
Expand All @@ -33,18 +34,34 @@ func (b *snssqs) Publish(data interface{}, options broker.PublishOptions) error
return err
}

if err := b.snsClient.ProduceToTopic(bs, b.options.Topic); err != nil {
if err := b.snsClient.ProduceToTopic(bs, options.Topic); err != nil {
return err
}

return nil
}

func (b *snssqs) Subscribe(callback func([]byte) error, options broker.SubscribeOptions) {
for {
b.sqsClient.ConsumeFromGroup(callback, b.options.Group, options)
time.Sleep(time.Second)
func (b *snssqs) Subscribe(callback func([]byte) error, options broker.SubscribeOptions) broker.Subscriber {
sub := &subscriber{
options: options,
id: uuid.New().String(),
handler: callback,
exit: make(chan struct{}),
}

go func() {
for {
select {
case <-sub.exit:
return
default:
b.sqsClient.ConsumeFromGroup(sub)
time.Sleep(time.Second)
}
}
}()

return sub
}

func (b *snssqs) String() string {
Expand Down
75 changes: 59 additions & 16 deletions sidecar/custom/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@ import (
"encoding/json"
"fmt"
"strconv"
"sync"
"time"

"github.com/w-h-a/pkg/broker"
"github.com/w-h-a/pkg/client"
"github.com/w-h-a/pkg/sidecar"
"github.com/w-h-a/pkg/store"
"github.com/w-h-a/pkg/telemetry/log"
)

type customSidecar struct {
options sidecar.SidecarOptions
options sidecar.SidecarOptions
subscribers map[string]broker.Subscriber
mtx sync.RWMutex
}

func (s *customSidecar) Options() sidecar.SidecarOptions {
Expand Down Expand Up @@ -79,22 +83,59 @@ func (s *customSidecar) ReadEventsFromBroker(brokerId, eventName string) {
return
}

go func() {
bk.Subscribe(func(b []byte) error {
var body interface{}
if err := json.Unmarshal(b, &body); err != nil {
return err
}
s.mtx.RLock()

event := &sidecar.Event{
Data: body,
EventName: eventName,
CreatedAt: time.Now(),
}
_, ok = s.subscribers[brokerId]
if ok {
log.Warnf("a subscriber for broker %s was already found", brokerId)
s.mtx.RUnlock()
return
}

s.mtx.RUnlock()

sub := bk.Subscribe(func(b []byte) error {
var body interface{}
if err := json.Unmarshal(b, &body); err != nil {
return err
}

event := &sidecar.Event{
Data: body,
EventName: eventName,
CreatedAt: time.Now(),
}

return s.postEventToApp(event)
}, bk.Options().SubscribeOptions)

s.mtx.Lock()
defer s.mtx.Unlock()

s.subscribers[brokerId] = sub
}

func (s *customSidecar) UnsubscribeFromBroker(brokerId string) error {
s.mtx.RLock()

sub, ok := s.subscribers[brokerId]
if !ok {
s.mtx.RUnlock()
return nil
}

s.mtx.RUnlock()

return s.postEventToApp(event)
}, bk.Options().SubscribeOptions)
}()
if err := sub.Unsubscribe(); err != nil {
return err
}

s.mtx.Lock()
defer s.mtx.Unlock()

delete(s.subscribers, brokerId)

return nil
}

func (s *customSidecar) String() string {
Expand Down Expand Up @@ -184,7 +225,9 @@ func NewSidecar(opts ...sidecar.SidecarOption) sidecar.Sidecar {
options := sidecar.NewSidecarOptions(opts...)

s := &customSidecar{
options: options,
options: options,
subscribers: map[string]broker.Subscriber{},
mtx: sync.RWMutex{},
}

return s
Expand Down
1 change: 1 addition & 0 deletions sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ type Sidecar interface {
SaveStateToStore(store string, state []*store.Record) error
RetrieveStateFromStore(store, key string) ([]*store.Record, error)
ReadEventsFromBroker(broker, eventName string)
UnsubscribeFromBroker(broker string) error
String() string
}

0 comments on commit 1850e33

Please sign in to comment.