Skip to content

Commit

Permalink
Allow more SQS settings to be set by env var (#20)
Browse files Browse the repository at this point in the history
* Allow more SQS settings to be set by env var

* Move sqs config under main config type

* Add yaml tags
  • Loading branch information
mlarraz authored Nov 23, 2022
1 parent 410409f commit b910652
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 5 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ queue:
None of this information is actually an example of anything other than the
strucure of the file, so if you copy paste it you'll probably be disappointed.
### Environment Variables
A few optional settings can also be configured by environment variable:
* `SCOUT_SQS_MAX_NUMBER_OF_MESSAGES` - Max number of SQS messages to fetch at once
* `SCOUT_SQS_WAIT_TIME_SECONDS` - Max seconds to wait for an SQS message per poll
* `SCOUT_SQS_VISIBILITY_TIMEOUT` - How long to hide an SQS message after receiving it

## Versioning

Scout uses tagged commits that are compatible with go modules. The first module
Expand Down
8 changes: 8 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Config struct {
Redis RedisConfig `yaml:"redis"`
AWS AWSConfig `yaml:"aws"`
Queue QueueConfig `yaml:"queue"`
SQS SQSConfig
}

// RedisConfig is a nested config that contains the necessary parameters to
Expand All @@ -38,6 +39,13 @@ type QueueConfig struct {
Topics map[string]string `yaml:"topics"`
}

// SQSConfig is a nested config meant to be passed directly to the SQS client
type SQSConfig struct {
maxNumberOfMessages int64 `yaml:"max_number_of_messages"`
waitTimeSeconds int64 `yaml:"wait_time_seconds"`
visibilityTimeout int64 `yaml:"visibility_timeout"`
}

// ReadConfig reads from a file with the given name and returns a config or
// an error if the file was unable to be parsed. It does no error checking
// as far as required fields.
Expand Down
18 changes: 18 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"os/signal"
"strconv"
"syscall"
"time"

Expand Down Expand Up @@ -89,6 +90,23 @@ func runApp(ctx *cli.Context) error {
return cli.NewExitError("Failed to parse config file", 1)
}

maxNumberOfMessages, _ := strconv.ParseInt(os.Getenv("SCOUT_SQS_MAX_NUMBER_OF_MESSAGES"), 10, 64)
if maxNumberOfMessages != 0 {
config.SQS.maxNumberOfMessages = maxNumberOfMessages
} else {
config.SQS.maxNumberOfMessages = 10
}

waitTimeSeconds, _ := strconv.ParseInt(os.Getenv("SCOUT_SQS_WAIT_TIME_SECONDS"), 10, 64)
if waitTimeSeconds != 0 {
config.SQS.waitTimeSeconds = waitTimeSeconds
}

visibilityTimeout, _ := strconv.ParseInt(os.Getenv("SCOUT_SQS_VISIBILITY_TIMEOUT"), 10, 64)
if visibilityTimeout != 0 {
config.SQS.visibilityTimeout = visibilityTimeout
}

queue, err := NewQueue(config)
if err != nil {
return cli.NewExitError(fmt.Sprintf("Initialization error: %s", err.Error()), 1)
Expand Down
2 changes: 1 addition & 1 deletion queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewQueue(config *Config) (Queue, error) {
queue := new(queue)
var err error

queue.SQSClient, err = NewAWSSQSClient(config.AWS, config.Queue.Name)
queue.SQSClient, err = NewAWSSQSClient(config.AWS, config.Queue.Name, config.SQS)
if err != nil {
return nil, err
}
Expand Down
13 changes: 9 additions & 4 deletions sqs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,22 @@ type Message struct {
type sdkClient struct {
service *sqs.SQS
url string
SQSConfig
}

// NewAWSSQSClient creates an SQS client that talks to AWS on the given queue
func NewAWSSQSClient(conf AWSConfig, queueName string) (SQSClient, error) {
func NewAWSSQSClient(conf AWSConfig, queueName string, sqsConf SQSConfig) (SQSClient, error) {
creds := credentials.NewStaticCredentials(conf.AccessKey, conf.SecretKey, "")
sess, err := session.NewSession(&aws.Config{Region: formatRegion(conf.Region), Credentials: creds})

if err != nil {
return nil, err
}

client := new(sdkClient)
client.service = sqs.New(sess)
client := &sdkClient{
service: sqs.New(sess),
SQSConfig: sqsConf,
}

resp, err := client.service.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: &queueName,
Expand All @@ -57,7 +60,9 @@ func NewAWSSQSClient(conf AWSConfig, queueName string) (SQSClient, error) {
func (s *sdkClient) Fetch() ([]Message, error) {
res, err := s.service.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: &s.url,
MaxNumberOfMessages: aws.Int64(10),
MaxNumberOfMessages: &s.maxNumberOfMessages,
WaitTimeSeconds: &s.waitTimeSeconds,
VisibilityTimeout: &s.visibilityTimeout,
})
if err != nil {
return nil, err
Expand Down

0 comments on commit b910652

Please sign in to comment.