diff --git a/pkg/config/types.go b/pkg/config/types.go index ba924c311..8cd5efe8b 100644 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -17,6 +17,8 @@ type Config struct { Region string Kubeconfig string MasterURL string + QueueURL string + QueueARN string AWSSession *session.Session AWSClientset awsclient.ServiceoperatorV1alpha1Interface KubeClientset kubernetes.Interface diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index ee05ed6f7..06b2cd252 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -1,74 +1,19 @@ package queue import ( + "context" "encoding/json" + "os" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sns" "github.com/aws/aws-sdk-go/service/sqs" awsclient "github.com/awslabs/aws-service-operator/pkg/client/clientset/versioned/typed/service-operator.aws/v1alpha1" "github.com/awslabs/aws-service-operator/pkg/config" - "github.com/awslabs/aws-service-operator/pkg/helpers" - "os" - "strings" + "github.com/awslabs/aws-service-operator/pkg/queuemanager" ) -// HandlerFunc allows you to define a custom function for when a message is stored -type HandlerFunc func(config *config.Config, msg *MessageBody) error - -// HandleMessage will stub the handler for processing messages -func (f HandlerFunc) HandleMessage(config *config.Config, msg *MessageBody) error { - return f(config, msg) -} - -// Handler allows a custom function to be passed -type Handler interface { - HandleMessage(config *config.Config, msg *MessageBody) error -} - -// ParseMessage will take the message attribute and make it readable -func (m *MessageBody) ParseMessage() error { - m.Updatable = false - resp := make(map[string]string) - items := strings.Split(m.Message, "\n") - for _, item := range items { - x := strings.Split(item, "=") - key := x[0] - if key != "" { - s := x[1] - s = s[1 : len(s)-1] - resp[key] = s - } - } - m.ParsedMessage = resp - - var resourceProperties ResourceProperties - if resp["ResourceProperties"] != "null" { - err := json.Unmarshal([]byte(resp["ResourceProperties"]), &resourceProperties) - if err != nil { - return err - } - m.ResourceProperties = resourceProperties - for _, tag := range resourceProperties.Tags { - switch tag.Key { - case "Namespace": - m.Namespace = tag.Value - case "ResourceName": - m.ResourceName = tag.Value - } - } - if m.Namespace != "" && m.ResourceName != "" { - m.Updatable = true - } - } - return nil -} - -// IsComplete returns a simple status instead of the raw CFT resp -func (m *MessageBody) IsComplete() bool { - return helpers.IsStackComplete(m.ParsedMessage["ResourceStatus"], true) -} - // New will initialize the Queue object for watching func New(config *config.Config, awsclientset awsclient.ServiceoperatorV1alpha1Interface, timeout int) *Queue { return &Queue{ @@ -78,21 +23,11 @@ func New(config *config.Config, awsclientset awsclient.ServiceoperatorV1alpha1In } } -// Register will create all the affilate resources -func (q *Queue) Register(name string, obj interface{}) (topicARN string, queueURL string, queueAttrs map[string]*string, subARN string) { - config := q.config - logger := config.Logger - keyname := keyName(config.ClusterName, name) - svc := sns.New(config.AWSSession) - topicInputs := sns.CreateTopicInput{Name: aws.String(keyname)} - output, err := svc.CreateTopic(&topicInputs) - if err != nil { - logger.Errorf("Error creating SNS Topic with error '%s'", err.Error()) - } - topicARN = *output.TopicArn - logger.Infof("Created sns topic '%s'", topicARN) - +// RegisterQueue wkll create the Queue so it is accessible to SNS. +func RegisterQueue(config *config.Config, name string) (queueURL, queueARN string, manager *queuemanager.QueueManager, err error) { + manager = queuemanager.New() sqsSvc := sqs.New(config.AWSSession) + keyname := keyName(config.ClusterName, name) queueInputs := sqs.CreateQueueInput{ QueueName: aws.String(keyname), Attributes: map[string]*string{ @@ -102,14 +37,10 @@ func (q *Queue) Register(name string, obj interface{}) (topicARN string, queueUR } sqsOutput, err := sqsSvc.CreateQueue(&queueInputs) if err != nil { - logger.Errorf("Error creating SQS Queue with error '%s'", err.Error()) + return "", "", manager, err } queueURL = *sqsOutput.QueueUrl - logger.Infof("Created sqs queue '%s'", queueURL) - q.queueURL = queueURL - // Get Queue Information - // This will later happen with the CRD cft subscription queueQueryInputs := sqs.GetQueueAttributesInput{ QueueUrl: aws.String(queueURL), AttributeNames: []*string{ @@ -118,29 +49,46 @@ func (q *Queue) Register(name string, obj interface{}) (topicARN string, queueUR } sqsQueueOutput, err := sqsSvc.GetQueueAttributes(&queueQueryInputs) if err != nil { - logger.Errorf("Error getting SQS Information with error '%s'", err.Error()) + return "", "", manager, err } - queueAttrs = sqsQueueOutput.Attributes + queueARN = *sqsQueueOutput.Attributes["QueueArn"] + return queueURL, queueARN, manager, nil +} - policy := newPolicy(*queueAttrs["QueueArn"], keyname, topicARN) - policyb, err := json.Marshal(policy) +// Subscribe will listen to the global queue and distribute messages +func Subscribe(config *config.Config, manager *queuemanager.QueueManager, ctx context.Context) { + logger := config.Logger + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(config.Region), + }) if err != nil { - logger.WithError(err).Error("error encoding policy to json") + logger.WithError(err).Error("error creating AWS session") + os.Exit(1) } - addPermInput := &sqs.SetQueueAttributesInput{ - QueueUrl: aws.String(queueURL), - Attributes: map[string]*string{ - "Policy": aws.String(string(policyb)), - }} - _, err = sqsSvc.SetQueueAttributes(addPermInput) + + svc := sqs.New(sess) + + process(config, svc, manager, ctx) +} + +// Register will create all the affilate resources +func (q *Queue) Register(name string) (topicARN string, subARN string) { + config := q.config + logger := config.Logger + keyname := keyName(config.ClusterName, name) + svc := sns.New(config.AWSSession) + topicInputs := sns.CreateTopicInput{Name: aws.String(keyname)} + output, err := svc.CreateTopic(&topicInputs) if err != nil { - logger.WithError(err).Error("error setting queue policy") + logger.Errorf("Error creating SNS Topic with error '%s'", err.Error()) } + topicARN = *output.TopicArn + logger.Infof("Created sns topic '%s'", topicARN) // Move to somewhere else. subInput := sns.SubscribeInput{ TopicArn: aws.String(topicARN), - Endpoint: aws.String(*queueAttrs["QueueArn"]), + Endpoint: aws.String(config.QueueARN), Protocol: aws.String("sqs"), } subOutput, err := svc.Subscribe(&subInput) @@ -153,54 +101,61 @@ func (q *Queue) Register(name string, obj interface{}) (topicARN string, queueUR return } -// StartWatch will start a go routine to watch for new events to store in etcd -func (q *Queue) StartWatch(h Handler, stopCh <-chan struct{}) { - config := q.config - logger := config.Logger - sess, err := session.NewSession(&aws.Config{ - Region: aws.String(config.Region), - }) +func SetQueuePolicy(config *config.Config, manager *queuemanager.QueueManager) error { + sqsSvc := sqs.New(config.AWSSession) + topicARNs := manager.Keys() + + policy := newPolicy(config.QueueARN, config.ClusterName, topicARNs) + policyb, err := json.Marshal(policy) if err != nil { - logger.WithError(err).Infof("error creating AWS session") - os.Exit(1) + return err } - svc := sqs.New(sess) - - process(q, svc, h, stopCh) + addPermInput := &sqs.SetQueueAttributesInput{ + QueueUrl: aws.String(config.QueueURL), + Attributes: map[string]*string{ + "Policy": aws.String(string(policyb)), + }} + _, err = sqsSvc.SetQueueAttributes(addPermInput) + if err != nil { + return err + } + return nil } func keyName(clusterName string, name string) string { return clusterName + "-" + name } -func newPolicy(queueARN string, keyname string, topicARN string) Policy { - return Policy{ - Version: "2012-10-17", - ID: queueARN + "/" + keyname + "-policy", - Statement: []Statement{ - Statement{ - Sid: keyname, - Effect: "Allow", - Principal: "*", - Action: []string{"SQS:ReceiveMessage", "SQS:SendMessage"}, - Resource: queueARN, - Condition: Condition{ - ArnEquals: ArnEquals{ - AwsSourceArn: topicARN, - }, +func newPolicy(queueARN string, name string, topicARNs []string) Policy { + statements := []Statement{} + for _, topicARN := range topicARNs { + statements = append(statements, Statement{ + Sid: topicARN + " statment", + Effect: "Allow", + Principal: "*", + Action: []string{"SQS:ReceiveMessage", "SQS:SendMessage"}, + Resource: queueARN, + Condition: Condition{ + ArnEquals: ArnEquals{ + AwsSourceArn: topicARN, }, }, - }, + }) + } + + return Policy{ + Version: "2012-10-17", + ID: queueARN + "/" + name + "-policy", + Statement: statements, } } -func process(q *Queue, svc *sqs.SQS, h Handler, stopCh <-chan struct{}) error { - config := q.config +func process(config *config.Config, svc *sqs.SQS, manager *queuemanager.QueueManager, ctx context.Context) error { logger := config.Logger for { result, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{ - QueueUrl: aws.String(q.queueURL), + QueueUrl: aws.String(config.QueueURL), AttributeNames: aws.StringSlice([]string{ "SentTimestamp", }), @@ -208,7 +163,7 @@ func process(q *Queue, svc *sqs.SQS, h Handler, stopCh <-chan struct{}) error { MessageAttributeNames: aws.StringSlice([]string{ "All", }), - WaitTimeSeconds: aws.Int64(q.timeout), + WaitTimeSeconds: aws.Int64(10), }) if err != nil { logger.WithError(err).Error("unable to receive messages from queue") @@ -216,7 +171,7 @@ func process(q *Queue, svc *sqs.SQS, h Handler, stopCh <-chan struct{}) error { } for _, message := range result.Messages { - mb := &MessageBody{} + mb := &queuemanager.MessageBody{} err := json.Unmarshal([]byte(*message.Body), mb) if err != nil { logger.WithError(err).Error("error parsing message body") @@ -224,13 +179,18 @@ func process(q *Queue, svc *sqs.SQS, h Handler, stopCh <-chan struct{}) error { mb.ParseMessage() logger.Debugf("%+v", mb) + h, ok := manager.Get(mb.TopicARN) + if !ok { + logger.Errorf("error missing handler function for topic %s", mb.TopicARN) + } + err = h.HandleMessage(config, mb) if err != nil { logger.WithError(err).Error("error processing message") } logger.Debugf("stackID %v updated status to %v", mb.ParsedMessage["StackId"], mb.ParsedMessage["ResourceStatus"]) _, err = svc.DeleteMessage(&sqs.DeleteMessageInput{ - QueueUrl: aws.String(q.queueURL), + QueueUrl: aws.String(config.QueueURL), ReceiptHandle: message.ReceiptHandle, }) @@ -239,11 +199,5 @@ func process(q *Queue, svc *sqs.SQS, h Handler, stopCh <-chan struct{}) error { os.Exit(1) } } - - select { - case <-stopCh: - os.Exit(1) - default: - } } } diff --git a/pkg/queue/types.go b/pkg/queue/types.go index af06d7caa..341b816d3 100644 --- a/pkg/queue/types.go +++ b/pkg/queue/types.go @@ -39,26 +39,3 @@ type Condition struct { type ArnEquals struct { AwsSourceArn string `json:"aws:SourceArn"` } - -// MessageBody will parse the message from the Body of SQS -type MessageBody struct { - Type string `json:"Type"` - TopicArn string `json:"TopicArn"` - Message string `json:"Message"` - ParsedMessage map[string]string - Namespace string - ResourceName string - ResourceProperties ResourceProperties - Updatable bool -} - -// ResourceProperties will wrap the ResourceProperties object -type ResourceProperties struct { - Tags []Tag `json:"Tags"` -} - -// Tag represents a Tag -type Tag struct { - Key string `json:"Key"` - Value string `json:"Value"` -} diff --git a/pkg/queuemanager/queuemanager.go b/pkg/queuemanager/queuemanager.go new file mode 100644 index 000000000..53f741189 --- /dev/null +++ b/pkg/queuemanager/queuemanager.go @@ -0,0 +1,91 @@ +package queuemanager + +import ( + "encoding/json" + "github.com/awslabs/aws-service-operator/pkg/config" + "github.com/awslabs/aws-service-operator/pkg/helpers" + "strings" +) + +// New will return the QueueManager +func New() *QueueManager { + return &QueueManager{ + handlers: make(map[string]Handler), + } +} + +// Get will return the handler func +func (q *QueueManager) Get(name string) (handler Handler, ok bool) { + q.lock.RLock() + defer q.lock.RUnlock() + if handler, ok = q.handlers[name]; ok { + return handler, ok + } + return handler, false +} + +// Add will add a new handler func +func (q *QueueManager) Add(name string, handlerFunc Handler) { + q.lock.Lock() + defer q.lock.Unlock() + q.handlers[name] = handlerFunc +} + +// Keys will return the list of topic ARNs +func (q *QueueManager) Keys() []string { + q.lock.RLock() + defer q.lock.RUnlock() + keys := []string{} + for key, _ := range q.handlers { + keys = append(keys, key) + } + return keys +} + +// HandleMessage will stub the handler for processing messages +func (f HandlerFunc) HandleMessage(config *config.Config, msg *MessageBody) error { + return f(config, msg) +} + +// ParseMessage will take the message attribute and make it readable +func (m *MessageBody) ParseMessage() error { + m.Updatable = false + resp := make(map[string]string) + items := strings.Split(m.Message, "\n") + for _, item := range items { + x := strings.Split(item, "=") + key := x[0] + if key != "" { + s := x[1] + s = s[1 : len(s)-1] + resp[key] = s + } + } + m.ParsedMessage = resp + + var resourceProperties ResourceProperties + if resp["ResourceProperties"] != "null" { + err := json.Unmarshal([]byte(resp["ResourceProperties"]), &resourceProperties) + if err != nil { + return err + } + m.ResourceProperties = resourceProperties + for _, tag := range resourceProperties.Tags { + switch tag.Key { + case "Namespace": + m.Namespace = tag.Value + case "ResourceName": + m.ResourceName = tag.Value + } + } + if m.Namespace != "" && m.ResourceName != "" { + m.Updatable = true + } + } + return nil +} + +// IsComplete returns a simple status instead of the raw CFT resp +func (m *MessageBody) IsComplete() bool { + return helpers.IsStackComplete(m.ParsedMessage["ResourceStatus"], true) +} diff --git a/pkg/queuemanager/types.go b/pkg/queuemanager/types.go new file mode 100644 index 000000000..b1d43cb5c --- /dev/null +++ b/pkg/queuemanager/types.go @@ -0,0 +1,43 @@ +package queuemanager + +import ( + "github.com/awslabs/aws-service-operator/pkg/config" + "sync" +) + +// HandlerFunc allows you to define a custom function for when a message is stored +type HandlerFunc func(config *config.Config, msg *MessageBody) error + +// Handler allows a custom function to be passed +type Handler interface { + HandleMessage(config *config.Config, msg *MessageBody) error +} + +// Queue Manager allows you to register topics and a handler function +type QueueManager struct { + lock sync.RWMutex + handlers map[string]Handler +} + +// MessageBody will parse the message from the Body of SQS +type MessageBody struct { + Type string `json:"Type"` + TopicARN string `json:"TopicArn"` + Message string `json:"Message"` + ParsedMessage map[string]string + Namespace string + ResourceName string + ResourceProperties ResourceProperties + Updatable bool +} + +// ResourceProperties will wrap the ResourceProperties object +type ResourceProperties struct { + Tags []Tag `json:"Tags"` +} + +// Tag represents a Tag +type Tag struct { + Key string `json:"Key"` + Value string `json:"Value"` +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 13e6ae7b9..4dfb55ee6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -8,6 +8,7 @@ import ( awsscheme "github.com/awslabs/aws-service-operator/pkg/client/clientset/versioned/scheme" "github.com/awslabs/aws-service-operator/pkg/config" opBase "github.com/awslabs/aws-service-operator/pkg/operators/base" + "github.com/awslabs/aws-service-operator/pkg/queue" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" @@ -56,11 +57,23 @@ func (c *Server) watchOperatorResources(errChan chan error, ctx context.Context) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName}) c.Config.Recorder = recorder - // start watching the aws operator resources - logger.WithFields(logrus.Fields{"resources": c.Config.Resources}).Info("Watching") - operators := opBase.New(c.Config) // TODO: remove context and Clientset + queueURL, queueARN, queueManager, err := queue.RegisterQueue(c.Config, "cloudformation") + if err != nil { + logger.WithError(err).Error("error reqistering queue") + } + c.Config.QueueURL = queueURL + c.Config.QueueARN = queueARN + + operators := opBase.New(c.Config, queueManager) + err = queue.SetQueuePolicy(c.Config, queueManager) + if err != nil { + logger.WithError(err).Error("error setting queue policy") + } + + logger.WithFields(logrus.Fields{"resources": c.Config.Resources}).Info("Watching") go operators.Watch(ctx, corev1.NamespaceAll) + go queue.Subscribe(c.Config, queueManager, ctx) <-ctx.Done() c.Config.Logger.Info("operators stopped") }