Skip to content

Commit

Permalink
feat: pub sub
Browse files Browse the repository at this point in the history
  • Loading branch information
w-h-a committed Aug 29, 2024
1 parent 0e818a0 commit 60da655
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 118 deletions.
8 changes: 4 additions & 4 deletions broker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ type BrokerOption func(o *BrokerOptions)

type BrokerOptions struct {
Nodes []string
PublishOptions PublishOptions
SubscribeOptions SubscribeOptions
PublishOptions *PublishOptions
SubscribeOptions *SubscribeOptions
Context context.Context
}

Expand All @@ -17,13 +17,13 @@ func BrokerWithNodes(addrs ...string) BrokerOption {
}
}

func BrokerWithPublishOptions(options PublishOptions) BrokerOption {
func BrokerWithPublishOptions(options *PublishOptions) BrokerOption {
return func(o *BrokerOptions) {
o.PublishOptions = options
}
}

func BrokerWithSubscribeOptions(options SubscribeOptions) BrokerOption {
func BrokerWithSubscribeOptions(options *SubscribeOptions) BrokerOption {
return func(o *BrokerOptions) {
o.SubscribeOptions = options
}
Expand Down
3 changes: 2 additions & 1 deletion broker/snssqs/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ type SqsClient interface {

type sqsClient struct {
*sqs.Client
queueUrl *string
visibilityTimeout int32
waitTimeSeconds int32
}

func (c *sqsClient) ConsumeFromGroup(sub broker.Subscriber) {
result, err := c.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
QueueUrl: aws.String(sub.Options().Group),
QueueUrl: c.queueUrl,
MaxNumberOfMessages: 1,
VisibilityTimeout: c.visibilityTimeout,
WaitTimeSeconds: c.waitTimeSeconds,
Expand Down
40 changes: 27 additions & 13 deletions broker/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sqs"
Expand Down Expand Up @@ -77,7 +78,7 @@ func (b *snssqs) configure() error {
b.sqsClient = sqs
}

if b.snsClient != nil && b.sqsClient != nil {
if b.snsClient != nil || b.sqsClient != nil {
return nil
}

Expand All @@ -86,21 +87,34 @@ func (b *snssqs) configure() error {
return err
}

b.snsClient = &snsClient{sns.NewFromConfig(cfg)}

visibilityTimeout := defaultVisibilityTimeout

waitTimeSeconds := defaultWaitSeconds

if timeout, ok := GetVisibilityTimeoutFromContext(b.options.SubscribeOptions.Context); ok {
visibilityTimeout = timeout
if b.options.PublishOptions != nil {
b.snsClient = &snsClient{sns.NewFromConfig(cfg)}
}

if waitTime, ok := GetWaitTimeSecondsFromContext(b.options.SubscribeOptions.Context); ok {
waitTimeSeconds = waitTime
}
if b.options.SubscribeOptions != nil {
visibilityTimeout := defaultVisibilityTimeout

b.sqsClient = &sqsClient{sqs.NewFromConfig(cfg), visibilityTimeout, waitTimeSeconds}
waitTimeSeconds := defaultWaitSeconds

if timeout, ok := GetVisibilityTimeoutFromContext(b.options.SubscribeOptions.Context); ok {
visibilityTimeout = timeout
}

if waitTime, ok := GetWaitTimeSecondsFromContext(b.options.SubscribeOptions.Context); ok {
waitTimeSeconds = waitTime
}

client := sqs.NewFromConfig(cfg)

url, err := client.GetQueueUrl(context.Background(), &sqs.GetQueueUrlInput{
QueueName: aws.String(b.options.SubscribeOptions.Group),
})
if err != nil {
return err
}

b.sqsClient = &sqsClient{client, url.QueueUrl, visibilityTimeout, waitTimeSeconds}
}

return nil
}
Expand Down
113 changes: 24 additions & 89 deletions sidecar/custom/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,16 @@ func (s *customSidecar) RemoveStateFromStore(storeId, key string) error {
return nil
}

func (s *customSidecar) OnEventPublished(event *sidecar.Event) error {
var err error
func (s *customSidecar) WriteEventToBroker(event *sidecar.Event) error {
if len(event.To) == 0 {
return nil
}

if len(event.To) > 0 {
err = s.actOnEventFromApp(event)
} else {
// TODO: rm
err = s.postEventToApp(event)
if err := s.actOnEventFromService(event); err != nil {
return err
}

return err
return nil
}

func (s *customSidecar) ReadEventsFromBroker(brokerId string) {
Expand Down Expand Up @@ -134,8 +133,8 @@ func (s *customSidecar) ReadEventsFromBroker(brokerId string) {
CreatedAt: time.Now(),
}

return s.postEventToApp(event)
}, bk.Options().SubscribeOptions)
return s.sendEventToService(event)
}, *bk.Options().SubscribeOptions)

s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down Expand Up @@ -170,13 +169,7 @@ func (s *customSidecar) String() string {
return "custom"
}

func (s *customSidecar) actOnEventFromApp(event *sidecar.Event) error {
if len(event.State.Records) > 0 {
if err := s.SaveStateToStore(&event.State); err != nil {
return err
}
}

func (s *customSidecar) actOnEventFromService(event *sidecar.Event) error {
if len(event.To) == 0 {
return nil
}
Expand Down Expand Up @@ -216,99 +209,41 @@ func (s *customSidecar) sendEventToTargetsSequentially(event *sidecar.Event) err

func (s *customSidecar) sendEventToTarget(target string, event *sidecar.Event) error {
bk, ok := s.options.Brokers[target]
if ok {
if err := bk.Publish(event.Data, bk.Options().PublishOptions); err != nil {
return err
}
} else {
// TODO: rm
name := fmt.Sprintf("%s-action", target)

url := fmt.Sprintf("%s:%s", name, s.options.HttpPort.Port)

newEvent := &sidecar.Event{
EventName: event.EventName,
Data: event.Data,
CreatedAt: time.Now(),
}

if _, err := s.sendEventViaHttp(name, name, s.options.HttpPort.Port, "publish", url, newEvent); err != nil {
return err
}
}

return nil
}

func (s *customSidecar) postEventToApp(event *sidecar.Event) error {
var rsp *sidecar.Event
var err error

url := fmt.Sprintf("%s:%s", s.options.ServiceName, s.options.ServicePort.Port)

if s.options.ServicePort.Protocol == "rpc" {
rsp, err = s.sendEventViaRpc(s.options.ServiceName, s.options.ServiceName, s.options.ServicePort.Port, event.EventName, url, event)
if err != nil {
return err
}
} else {
rsp, err = s.sendEventViaHttp(s.options.ServiceName, s.options.ServiceName, s.options.ServicePort.Port, event.EventName, url, event)
if err != nil {
return err
}
}

if rsp == nil {
if !ok {
return nil
}

if err := s.actOnEventFromApp(rsp); err != nil {
if err := bk.Publish(event.Data, *bk.Options().PublishOptions); err != nil {
return err
}

return nil
}

// TODO: if we bind to the service at startup, we could have one client and only one of these methods
func (s *customSidecar) sendEventToService(event *sidecar.Event) error {
url := fmt.Sprintf("%s:%s", s.options.ServiceName, s.options.ServicePort.Port)

func (s *customSidecar) sendEventViaHttp(namespace, name, port, endpoint, baseUrl string, event *sidecar.Event) (*sidecar.Event, error) {
p, _ := strconv.Atoi(port)
p, _ := strconv.Atoi(s.options.ServicePort.Port)

req := s.options.HttpClient.NewRequest(
client.RequestWithNamespace(namespace),
client.RequestWithName(name),
req := s.options.Client.NewRequest(
client.RequestWithNamespace(s.options.ServiceName),
client.RequestWithName(s.options.ServiceName),
client.RequestWithPort(p),
client.RequestWithMethod(endpoint),
client.RequestWithMethod(event.EventName),
client.RequestWithUnmarshaledRequest(event),
)

rsp := &sidecar.Event{}

if err := s.options.HttpClient.Call(context.Background(), req, rsp, client.CallWithAddress(baseUrl)); err != nil {
return nil, err
if err := s.options.Client.Call(context.Background(), req, rsp, client.CallWithAddress(url)); err != nil {
return err
}

return rsp, nil
}

func (s *customSidecar) sendEventViaRpc(namespace, name, port, method, baseUrl string, event *sidecar.Event) (*sidecar.Event, error) {
p, _ := strconv.Atoi(port)

req := s.options.RpcClient.NewRequest(
client.RequestWithNamespace(namespace),
client.RequestWithName(name),
client.RequestWithPort(p),
client.RequestWithMethod(method),
client.RequestWithUnmarshaledRequest(event),
)

rsp := &sidecar.Event{}

if err := s.options.RpcClient.Call(context.Background(), req, rsp, client.CallWithAddress(baseUrl)); err != nil {
return nil, err
if err := s.actOnEventFromService(rsp); err != nil {
return err
}

return rsp, nil
return nil
}

func NewSidecar(opts ...sidecar.SidecarOption) sidecar.Sidecar {
Expand Down
13 changes: 3 additions & 10 deletions sidecar/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ type SidecarOptions struct {
HttpPort Port
RpcPort Port
ServicePort Port
HttpClient client.Client
RpcClient client.Client
Client client.Client
Stores map[string]store.Store
Brokers map[string]broker.Broker
Context context.Context
Expand Down Expand Up @@ -51,15 +50,9 @@ func SidecarWithServicePort(p Port) SidecarOption {
}
}

func SidecarWithHttpClient(c client.Client) SidecarOption {
func SidecarWithClient(c client.Client) SidecarOption {
return func(o *SidecarOptions) {
o.HttpClient = c
}
}

func SidecarWithRpcClient(c client.Client) SidecarOption {
return func(o *SidecarOptions) {
o.RpcClient = c
o.Client = c
}
}

Expand Down
2 changes: 1 addition & 1 deletion sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type Sidecar interface {
SaveStateToStore(state *State) error
RetrieveStateFromStore(store, key string) ([]*store.Record, error)
RemoveStateFromStore(store, key string) error
OnEventPublished(event *Event) error
WriteEventToBroker(event *Event) error
ReadEventsFromBroker(broker string)
UnsubscribeFromBroker(broker string) error
String() string
Expand Down

0 comments on commit 60da655

Please sign in to comment.