Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
xBlaz3kx committed Dec 31, 2021
0 parents commit 3876f47
Show file tree
Hide file tree
Showing 14 changed files with 1,320 additions and 0 deletions.
13 changes: 13 additions & 0 deletions ReadMe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# SQSObserver-go

This is an internal library used for observing (polling) SQS queues. OOP, easily configurable and running in goroutines.

Read the [configuration](docs/configuration.md) for queue configuration examples.

## Possible improvements

1. Worker pool and priority queues:
- assign a priority for a queue
- have a limited amount of workers in a pool
- workers poll the queues based on the priority and switch between queues
2. Api changes (semantics)
15 changes: 15 additions & 0 deletions configuration/configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package configuration

type (
SQS struct {
PollDuration int64 `fig:"pollDuration" default:"3"`
MessageTimeout int64 `fig:"messageTimeout" default:"2"`
Queues []Observer `fig:"queues"`
}

Observer struct {
Tag string `fig:"tag"`
QueueName string `fig:"queueName" validation:"required"`
PollDuration int64 `fig:"pollDuration" validation:"gte=0"`
}
)
39 changes: 39 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# SQS configuration

## Understanding the internal API and configuration

### The `Observer` and `Publisher` interfaces

The Observer is an abstract interface that listens to one/multiple sources and will output any messages received to a
dedicated channel.

A `Publisher` is also an abstract interface, which consumes messages from a channel or gets a message from a `Send`
interface.

### The `ObserverManager`

The core component of the internal API is a `ObserverManager` that keeps track of all queues, their tags and enables the
central system to access queues at any given time. The manager also contains a default `Publisher` that will send
messages to the appropriate Queue based on the `messageType`.

### Configuration concepts

The configuration describes internal SQS observer configuration. It will group multiple `queue`s based on their tags and
add them to one `Observer`. This improves performance if the service gets a lot of requests from a single queue. If the
queue `Tag` is not specified explicitly, the queue will be added to a `DefaultObserver`, a round-robin queue
listener/`Observer`. The queue names/urls in this case are based on the `messageType`.

## Configuration file

```yaml
sqs:
messageTimeout: 5
pollDuration: 3
queues:
- tag: ""
messageType: ""
pollDuration: 10
- tag: ""
messageType: ""
pollDuration: 10
```
55 changes: 55 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
module github.com/GLCharge/sqsObserver-go

go 1.17

replace (
github.com/GLCharge/sqsObserver-go/configuration => ./configuration
github.com/GLCharge/sqsObserver-go/models/messages => ./models/messages
github.com/GLCharge/sqsObserver-go/models/version => ./models/version
)

require (
github.com/aws/aws-sdk-go v1.42.25
github.com/cenkalti/backoff/v4 v4.1.2
github.com/lorenzodonini/ocpp-go v0.15.0
github.com/ory/dockertest/v3 v3.8.1
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
)

require (
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
github.com/Microsoft/go-winio v0.5.1 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/cli v20.10.11+incompatible // indirect
github.com/docker/docker v20.10.7+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/go-playground/locales v0.12.1 // indirect
github.com/go-playground/universal-translator v0.16.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/gorilla/mux v1.7.3 // indirect
github.com/gorilla/websocket v1.4.1 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/leodido/go-urn v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/moby/term v0.0.0-20201216013528-df9cb8a40635 // indirect
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.1.1 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887 // indirect
gopkg.in/go-playground/validator.v9 v9.30.0 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
63 changes: 63 additions & 0 deletions models/messages/messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package messages

import (
"encoding/json"
"errors"
"github.com/GLCharge/sqsObserver-go/models/version"
"time"
)

// MessageType constants
const (
BootNotification = MessageType("BootNotification")
DisconnectedNotification = MessageType("DisconnectedNotification")
StatusNotification = MessageType("StatusNotification")
AuthTag = MessageType("TagAuthentication")
Heartbeat = MessageType("Heartbeat")
StartTransaction = MessageType("StartTransaction")
StopTransaction = MessageType("StopTransaction")
RemoteStartTransaction = MessageType("RemoteStartTransaction")
RemoteStopTransaction = MessageType("RemoteStopTransaction")
MeterValue = MessageType("MeterValue")
UnlockConnector = MessageType("UnlockConnector")
Reset = MessageType("Reset")
ChangeConfiguration = MessageType("ChangeConfiguration")
ChangeAvailability = MessageType("ChangeAvailability")
DataTransfer = MessageType("DataTransfer")
ClearCache = MessageType("ClearCache")
SetChargingProfile = MessageType("SetChargingProfile")
ClearChargingProfile = MessageType("ClearChargingProfile")
)

var (
ErrUnsupportedVersion = errors.New("unsupported protocol version")
ErrCpIdInvalid = errors.New("cp cannot be an empty string")
)

type (
MessageType string

//ApiMessage represents a generic message used for communication with the service.
ApiMessage struct {
MessageId string `json:"messageId" validate:"required"`
MessageType MessageType `json:"messageType" validate:"required,isSupportedMessageType"`
Timestamp *time.Time `json:"timestamp,omitempty"`
ProtocolVersion version.ProtocolVersion `json:"protocolVersion" validate:"isSupportedProtocol"`
//Error string `json:"error"`
// All payload structs should go in the data attribute
Data interface{} `json:"data,omitempty"`
}
)

func (mt MessageType) String() string {
return string(mt)
}

func (qm *ApiMessage) String() string {
out, err := json.Marshal(&qm)
if err != nil {
return ""
}

return string(out)
}
11 changes: 11 additions & 0 deletions models/version/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package version

const (
ProtocolVersion15 = ProtocolVersion("1.5")
ProtocolVersion16 = ProtocolVersion("1.6")
ProtocolVersion20 = ProtocolVersion("2.0")
)

type (
ProtocolVersion string
)
177 changes: 177 additions & 0 deletions multiple-queue-observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package sqsObserver_go

import (
"context"
"encoding/json"
"github.com/GLCharge/sqsObserver-go/models/messages"
"github.com/GLCharge/sqsObserver-go/sqs"
"github.com/aws/aws-sdk-go/aws/session"
awsSqs "github.com/aws/aws-sdk-go/service/sqs"
log "github.com/sirupsen/logrus"
"sync"
"time"
)

type (
//MultipleQueueObserver is a concrete implementation of QueueObserver.
MultipleQueueObserver struct {
channel chan messages.ApiMessage
defaultPollDuration int64
timeout int64
queues sync.Map
pollDurations map[string]int64
svc *awsSqs.SQS
}
)

// NewMultipleQueueObserver creates a new observer with multiple queues.
func NewMultipleQueueObserver(session *session.Session) *MultipleQueueObserver {
// Create an SQS service client
mutex := sync.Mutex{}
mutex.Lock()
svc := awsSqs.New(session)
mutex.Unlock()

return &MultipleQueueObserver{
channel: make(chan messages.ApiMessage, 20),
queues: sync.Map{},
defaultPollDuration: 2,
timeout: 1,
pollDurations: make(map[string]int64),
svc: svc,
}
}

// NewMultipleQueueObserverWithChannel creates a new observer with multiple queues with shared channel.
func NewMultipleQueueObserverWithChannel(session *session.Session, messageChan chan messages.ApiMessage) *MultipleQueueObserver {
if messageChan == nil {
return nil
}

// Create an SQS service client
mutex := sync.Mutex{}
mutex.Lock()
svc := awsSqs.New(session)
mutex.Unlock()

return &MultipleQueueObserver{
channel: messageChan,
queues: sync.Map{},
defaultPollDuration: 2,
timeout: 1,
pollDurations: make(map[string]int64),
svc: svc,
}
}

//GetConsumerChannel returns the ApiMessage channel
func (mqo *MultipleQueueObserver) GetConsumerChannel() <-chan messages.ApiMessage {
return mqo.channel
}

//Start starts listening to the SQS for incoming messages as well as listening to the channel.
func (mqo *MultipleQueueObserver) Start(ctx context.Context) {
var (
i = 0
waitTime = mqo.defaultPollDuration
queueUrls []string
)

mqo.queues.Range(func(key, value interface{}) bool {
queueUrls = append(queueUrls, value.(string))
return true
})

log.Debugf("Started the observer with queues: %v", queueUrls)

if len(queueUrls) == 0 {
log.Debug("Observer listening to no queues, returning..")
return
}

Loop:
for {
select {
// Listen to context
case <-ctx.Done():
log.Info("Stopping the multiple queue observer..")
close(mqo.channel)
break Loop
default:
// Reset the queue index
if i > len(queueUrls)-1 {
i = 0
}

if len(queueUrls) > 0 {
log.Tracef("Polling queue: %s", queueUrls[i])

queueUrl := queueUrls[i]
qMessages, err := sqs.GetMessages(mqo.svc, queueUrl, &mqo.timeout, &waitTime)
if err != nil {
log.Errorf("Error getting messages: %v", err)
continue
}

if qMessages != nil {
for _, message := range qMessages.Messages {
mqo.sendMessageToChannel(queueUrl, message)
}
}
i++
}

time.Sleep(30 * time.Millisecond)
}
}
}

func (mqo *MultipleQueueObserver) AddQueuesToObserve(queueNames ...string) {
log.WithField("queues", queueNames).Debug("Adding queues to observe")

if queueNames != nil {
// Get urls for the queues
for _, queueName := range queueNames {
url, err := sqs.GetQueueURL(mqo.svc, queueName)
if err != nil {
continue
}

if url != nil {
mqo.queues.Store(queueName, *url.QueueUrl)
}
}
}
}

func (mqo *MultipleQueueObserver) SetPollDuration(pollDuration int64) {
mqo.defaultPollDuration = pollDuration
}

func (mqo *MultipleQueueObserver) SetTimeout(timeout int64) {
mqo.timeout = timeout
}

func (mqo *MultipleQueueObserver) sendMessageToChannel(queueUrl string, message *awsSqs.Message) {
var (
sqsMessage messages.ApiMessage
messageBody = message.Body
)

if messageBody != nil {
err := json.Unmarshal([]byte(*messageBody), &sqsMessage)
if err != nil {
log.Errorf("Error unmarshalling message: %v", err)
return
}

log.Debugf("Sending message to channel: %v", *messageBody)
mqo.channel <- sqsMessage

// todo potential issue while scaling services
err = sqs.DeleteMessage(mqo.svc, &queueUrl, message.ReceiptHandle)
if err != nil {
log.Warnf("Couldn't delete the message from queue: %v", err)
}
}
}
Loading

0 comments on commit 3876f47

Please sign in to comment.