From 1bfa2b8839b97fd8f62cf75805c33092778c6ed9 Mon Sep 17 00:00:00 2001 From: Christopher Hein Date: Mon, 27 Aug 2018 15:46:19 -0700 Subject: [PATCH] Adding Generated Code for SNS Subscriptions Signed-off-by: Christopher Hein --- .../v1alpha1/cloudformationtemplate.go | 1 + pkg/apis/operator.aws/v1alpha1/dynamodb.go | 1 + .../operator.aws/v1alpha1/ecrrepository.go | 1 + pkg/apis/operator.aws/v1alpha1/s3bucket.go | 2 + .../operator.aws/v1alpha1/snssubscription.go | 69 +++++ pkg/apis/operator.aws/v1alpha1/snstopic.go | 3 +- pkg/apis/operator.aws/v1alpha1/sqsqueue.go | 1 + .../v1alpha1/zz_generated.deepcopy.go | 127 +++++++++ .../v1alpha1/fake/fake_operator.aws_client.go | 4 + .../v1alpha1/fake/fake_snssubscription.go | 128 +++++++++ .../v1alpha1/generated_expansion.go | 2 + .../v1alpha1/operator.aws_client.go | 5 + .../operator.aws/v1alpha1/snssubscription.go | 157 +++++++++++ .../informers/externalversions/generic.go | 2 + .../operator.aws/v1alpha1/interface.go | 7 + .../operator.aws/v1alpha1/snssubscription.go | 89 ++++++ .../v1alpha1/expansion_generated.go | 8 + .../operator.aws/v1alpha1/snssubscription.go | 94 +++++++ pkg/helpers/service.go | 5 +- pkg/helpers/template_functions.go | 120 ++++++++ .../cloudformationtemplate/controller.go | 4 + pkg/operator/dynamodb/cft.go | 110 ++++++-- pkg/operator/dynamodb/controller.go | 126 ++++++--- pkg/operator/ecrrepository/cft.go | 26 +- pkg/operator/ecrrepository/controller.go | 130 ++++++--- pkg/operator/s3bucket/cft.go | 82 +++++- pkg/operator/s3bucket/controller.go | 126 ++++++--- pkg/operator/snssubscription/cft.go | 233 +++++++++++++++ pkg/operator/snssubscription/controller.go | 266 ++++++++++++++++++ pkg/operator/snstopic/cft.go | 26 +- pkg/operator/snstopic/controller.go | 124 +++++--- pkg/operator/sqsqueue/cft.go | 138 +++++++-- pkg/operator/sqsqueue/controller.go | 134 ++++++--- 33 files changed, 2124 insertions(+), 227 deletions(-) create mode 100644 pkg/apis/operator.aws/v1alpha1/snssubscription.go create mode 100644 pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/fake/fake_snssubscription.go create mode 100644 pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/snssubscription.go create mode 100644 pkg/client/informers/externalversions/operator.aws/v1alpha1/snssubscription.go create mode 100644 pkg/client/listers/operator.aws/v1alpha1/snssubscription.go create mode 100644 pkg/helpers/template_functions.go create mode 100644 pkg/operator/snssubscription/cft.go create mode 100644 pkg/operator/snssubscription/controller.go diff --git a/pkg/apis/operator.aws/v1alpha1/cloudformationtemplate.go b/pkg/apis/operator.aws/v1alpha1/cloudformationtemplate.go index 7c1aadaea..5a222b8f4 100644 --- a/pkg/apis/operator.aws/v1alpha1/cloudformationtemplate.go +++ b/pkg/apis/operator.aws/v1alpha1/cloudformationtemplate.go @@ -22,6 +22,7 @@ type CloudFormationTemplate struct { type CloudFormationTemplateData struct { CloudFormationTemplateName string `json:"cloudFormationTemplateName"` CloudFormationTemplateNamespace string `json:"cloudFormationTemplateNamespace"` + RollbackCount int `json:"rollbackCount"` Key string `json:"key"` Template string `json:"template"` } diff --git a/pkg/apis/operator.aws/v1alpha1/dynamodb.go b/pkg/apis/operator.aws/v1alpha1/dynamodb.go index a43dcb19c..a21dbab71 100644 --- a/pkg/apis/operator.aws/v1alpha1/dynamodb.go +++ b/pkg/apis/operator.aws/v1alpha1/dynamodb.go @@ -34,6 +34,7 @@ type DynamoDBRangeAttribute struct { type DynamoDBSpec struct { CloudFormationTemplateName string `json:"cloudFormationTemplateName"` CloudFormationTemplateNamespace string `json:"cloudFormationTemplateNamespace"` + RollbackCount int `json:"rollbackCount"` RangeAttribute DynamoDBRangeAttribute `json:"rangeAttribute"` ReadCapacityUnits int `json:"readCapacityUnits"` WriteCapacityUnits int `json:"writeCapacityUnits"` diff --git a/pkg/apis/operator.aws/v1alpha1/ecrrepository.go b/pkg/apis/operator.aws/v1alpha1/ecrrepository.go index 64ce0ed9d..03e67c221 100644 --- a/pkg/apis/operator.aws/v1alpha1/ecrrepository.go +++ b/pkg/apis/operator.aws/v1alpha1/ecrrepository.go @@ -22,6 +22,7 @@ type ECRRepository struct { type ECRRepositorySpec struct { CloudFormationTemplateName string `json:"cloudFormationTemplateName"` CloudFormationTemplateNamespace string `json:"cloudFormationTemplateNamespace"` + RollbackCount int `json:"rollbackCount"` } diff --git a/pkg/apis/operator.aws/v1alpha1/s3bucket.go b/pkg/apis/operator.aws/v1alpha1/s3bucket.go index 5aaabbc31..3cb54af02 100644 --- a/pkg/apis/operator.aws/v1alpha1/s3bucket.go +++ b/pkg/apis/operator.aws/v1alpha1/s3bucket.go @@ -28,7 +28,9 @@ type S3BucketLogging struct { type S3BucketSpec struct { CloudFormationTemplateName string `json:"cloudFormationTemplateName"` CloudFormationTemplateNamespace string `json:"cloudFormationTemplateNamespace"` + RollbackCount int `json:"rollbackCount"` Versioning bool `json:"versioning"` + AccessControl string `json:"accessControl"` Logging S3BucketLogging `json:"logging"` } diff --git a/pkg/apis/operator.aws/v1alpha1/snssubscription.go b/pkg/apis/operator.aws/v1alpha1/snssubscription.go new file mode 100644 index 000000000..9265530f0 --- /dev/null +++ b/pkg/apis/operator.aws/v1alpha1/snssubscription.go @@ -0,0 +1,69 @@ +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// +genclient +// +genclient:noStatus +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// SNSSubscription defines the base resource +type SNSSubscription struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` + Spec SNSSubscriptionSpec `json:"spec"` + Status SNSSubscriptionStatus `json:"status"` + Output SNSSubscriptionOutput `json:"output"` + AdditionalResources SNSSubscriptionAdditionalResources `json:"additionalResources"` +} +// SNSSubscriptionSpec defines the Spec resource for SNSSubscription +type SNSSubscriptionSpec struct { + CloudFormationTemplateName string `json:"cloudFormationTemplateName"` + CloudFormationTemplateNamespace string `json:"cloudFormationTemplateNamespace"` + RollbackCount int `json:"rollbackCount"` + TopicName string `json:"topicName"` + Protocol string `json:"protocol"` + Endpoint string `json:"endpoint"` + QueueURL string `json:"queueURL"` +} + + +// SNSSubscriptionOutput defines the output resource for SNSSubscription +type SNSSubscriptionOutput struct { + SubscriptionARN string `json:"subscriptionARN"` +} + +// SNSSubscriptionStatus holds the status of the Cloudformation template +type SNSSubscriptionStatus struct { + ResourceStatus string `json:"resourceStatus"` + ResourceStatusReason string `json:"resourceStatusReason"` + StackID string `json:"stackID"` +} + +// SNSSubscriptionAdditionalResources holds the additional resources +type SNSSubscriptionAdditionalResources struct { +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// SNSSubscriptionList defines the list attribute for the SNSSubscription type +type SNSSubscriptionList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + Items []SNSSubscription `json:"items"` +} + +func init() { + localSchemeBuilder.Register(addSNSSubscriptionTypes) +} + +func addSNSSubscriptionTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &SNSSubscription{}, + &SNSSubscriptionList{}, + ) + metav1.AddToGroupVersion(scheme, SchemeGroupVersion) + return nil +} diff --git a/pkg/apis/operator.aws/v1alpha1/snstopic.go b/pkg/apis/operator.aws/v1alpha1/snstopic.go index 0f236968d..1104d8a3e 100644 --- a/pkg/apis/operator.aws/v1alpha1/snstopic.go +++ b/pkg/apis/operator.aws/v1alpha1/snstopic.go @@ -21,11 +21,12 @@ type SNSTopic struct { type SNSTopicSpec struct { CloudFormationTemplateName string `json:"cloudFormationTemplateName"` CloudFormationTemplateNamespace string `json:"cloudFormationTemplateNamespace"` + RollbackCount int `json:"rollbackCount"` } // SNSTopicOutput defines the output resource for SNSTopic type SNSTopicOutput struct { - TopicName string `json:"topicName"` + TopicARN string `json:"topicARN"` } // SNSTopicStatus holds the status of the Cloudformation template diff --git a/pkg/apis/operator.aws/v1alpha1/sqsqueue.go b/pkg/apis/operator.aws/v1alpha1/sqsqueue.go index 85e36f53e..036b18b36 100644 --- a/pkg/apis/operator.aws/v1alpha1/sqsqueue.go +++ b/pkg/apis/operator.aws/v1alpha1/sqsqueue.go @@ -22,6 +22,7 @@ type SQSQueue struct { type SQSQueueSpec struct { CloudFormationTemplateName string `json:"cloudFormationTemplateName"` CloudFormationTemplateNamespace string `json:"cloudFormationTemplateNamespace"` + RollbackCount int `json:"rollbackCount"` ContentBasedDeduplication bool `json:"contentBasedDeduplication"` DelaySeconds int `json:"delaySeconds"` MaximumMessageSize int `json:"maximumMessageSize"` diff --git a/pkg/apis/operator.aws/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/operator.aws/v1alpha1/zz_generated.deepcopy.go index 3431a594f..7cc513526 100644 --- a/pkg/apis/operator.aws/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/operator.aws/v1alpha1/zz_generated.deepcopy.go @@ -588,6 +588,133 @@ func (in *S3BucketStatus) DeepCopy() *S3BucketStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SNSSubscription) DeepCopyInto(out *SNSSubscription) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + out.Status = in.Status + out.Output = in.Output + out.AdditionalResources = in.AdditionalResources + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SNSSubscription. +func (in *SNSSubscription) DeepCopy() *SNSSubscription { + if in == nil { + return nil + } + out := new(SNSSubscription) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SNSSubscription) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SNSSubscriptionAdditionalResources) DeepCopyInto(out *SNSSubscriptionAdditionalResources) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SNSSubscriptionAdditionalResources. +func (in *SNSSubscriptionAdditionalResources) DeepCopy() *SNSSubscriptionAdditionalResources { + if in == nil { + return nil + } + out := new(SNSSubscriptionAdditionalResources) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SNSSubscriptionList) DeepCopyInto(out *SNSSubscriptionList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]SNSSubscription, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SNSSubscriptionList. +func (in *SNSSubscriptionList) DeepCopy() *SNSSubscriptionList { + if in == nil { + return nil + } + out := new(SNSSubscriptionList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SNSSubscriptionList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SNSSubscriptionOutput) DeepCopyInto(out *SNSSubscriptionOutput) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SNSSubscriptionOutput. +func (in *SNSSubscriptionOutput) DeepCopy() *SNSSubscriptionOutput { + if in == nil { + return nil + } + out := new(SNSSubscriptionOutput) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SNSSubscriptionSpec) DeepCopyInto(out *SNSSubscriptionSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SNSSubscriptionSpec. +func (in *SNSSubscriptionSpec) DeepCopy() *SNSSubscriptionSpec { + if in == nil { + return nil + } + out := new(SNSSubscriptionSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SNSSubscriptionStatus) DeepCopyInto(out *SNSSubscriptionStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SNSSubscriptionStatus. +func (in *SNSSubscriptionStatus) DeepCopy() *SNSSubscriptionStatus { + if in == nil { + return nil + } + out := new(SNSSubscriptionStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SNSTopic) DeepCopyInto(out *SNSTopic) { *out = *in diff --git a/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/fake/fake_operator.aws_client.go b/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/fake/fake_operator.aws_client.go index cc6147315..e6f968041 100644 --- a/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/fake/fake_operator.aws_client.go +++ b/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/fake/fake_operator.aws_client.go @@ -44,6 +44,10 @@ func (c *FakeOperatorV1alpha1) S3Buckets(namespace string) v1alpha1.S3BucketInte return &FakeS3Buckets{c, namespace} } +func (c *FakeOperatorV1alpha1) SNSSubscriptions(namespace string) v1alpha1.SNSSubscriptionInterface { + return &FakeSNSSubscriptions{c, namespace} +} + func (c *FakeOperatorV1alpha1) SNSTopics(namespace string) v1alpha1.SNSTopicInterface { return &FakeSNSTopics{c, namespace} } diff --git a/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/fake/fake_snssubscription.go b/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/fake/fake_snssubscription.go new file mode 100644 index 000000000..2d6a10f8c --- /dev/null +++ b/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/fake/fake_snssubscription.go @@ -0,0 +1,128 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "github.com/christopherhein/aws-operator/pkg/apis/operator.aws/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeSNSSubscriptions implements SNSSubscriptionInterface +type FakeSNSSubscriptions struct { + Fake *FakeOperatorV1alpha1 + ns string +} + +var snssubscriptionsResource = schema.GroupVersionResource{Group: "operator.aws", Version: "v1alpha1", Resource: "snssubscriptions"} + +var snssubscriptionsKind = schema.GroupVersionKind{Group: "operator.aws", Version: "v1alpha1", Kind: "SNSSubscription"} + +// Get takes name of the sNSSubscription, and returns the corresponding sNSSubscription object, and an error if there is any. +func (c *FakeSNSSubscriptions) Get(name string, options v1.GetOptions) (result *v1alpha1.SNSSubscription, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(snssubscriptionsResource, c.ns, name), &v1alpha1.SNSSubscription{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.SNSSubscription), err +} + +// List takes label and field selectors, and returns the list of SNSSubscriptions that match those selectors. +func (c *FakeSNSSubscriptions) List(opts v1.ListOptions) (result *v1alpha1.SNSSubscriptionList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(snssubscriptionsResource, snssubscriptionsKind, c.ns, opts), &v1alpha1.SNSSubscriptionList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.SNSSubscriptionList{} + for _, item := range obj.(*v1alpha1.SNSSubscriptionList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested sNSSubscriptions. +func (c *FakeSNSSubscriptions) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(snssubscriptionsResource, c.ns, opts)) + +} + +// Create takes the representation of a sNSSubscription and creates it. Returns the server's representation of the sNSSubscription, and an error, if there is any. +func (c *FakeSNSSubscriptions) Create(sNSSubscription *v1alpha1.SNSSubscription) (result *v1alpha1.SNSSubscription, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(snssubscriptionsResource, c.ns, sNSSubscription), &v1alpha1.SNSSubscription{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.SNSSubscription), err +} + +// Update takes the representation of a sNSSubscription and updates it. Returns the server's representation of the sNSSubscription, and an error, if there is any. +func (c *FakeSNSSubscriptions) Update(sNSSubscription *v1alpha1.SNSSubscription) (result *v1alpha1.SNSSubscription, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(snssubscriptionsResource, c.ns, sNSSubscription), &v1alpha1.SNSSubscription{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.SNSSubscription), err +} + +// Delete takes name of the sNSSubscription and deletes it. Returns an error if one occurs. +func (c *FakeSNSSubscriptions) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(snssubscriptionsResource, c.ns, name), &v1alpha1.SNSSubscription{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeSNSSubscriptions) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(snssubscriptionsResource, c.ns, listOptions) + + _, err := c.Fake.Invokes(action, &v1alpha1.SNSSubscriptionList{}) + return err +} + +// Patch applies the patch and returns the patched sNSSubscription. +func (c *FakeSNSSubscriptions) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.SNSSubscription, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(snssubscriptionsResource, c.ns, name, data, subresources...), &v1alpha1.SNSSubscription{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.SNSSubscription), err +} diff --git a/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/generated_expansion.go index 9504f4d22..4f7dad7b9 100644 --- a/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/generated_expansion.go @@ -26,6 +26,8 @@ type ECRRepositoryExpansion interface{} type S3BucketExpansion interface{} +type SNSSubscriptionExpansion interface{} + type SNSTopicExpansion interface{} type SQSQueueExpansion interface{} diff --git a/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/operator.aws_client.go b/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/operator.aws_client.go index 3a13f1305..6d8dd515e 100644 --- a/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/operator.aws_client.go +++ b/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/operator.aws_client.go @@ -31,6 +31,7 @@ type OperatorV1alpha1Interface interface { DynamoDBsGetter ECRRepositoriesGetter S3BucketsGetter + SNSSubscriptionsGetter SNSTopicsGetter SQSQueuesGetter } @@ -56,6 +57,10 @@ func (c *OperatorV1alpha1Client) S3Buckets(namespace string) S3BucketInterface { return newS3Buckets(c, namespace) } +func (c *OperatorV1alpha1Client) SNSSubscriptions(namespace string) SNSSubscriptionInterface { + return newSNSSubscriptions(c, namespace) +} + func (c *OperatorV1alpha1Client) SNSTopics(namespace string) SNSTopicInterface { return newSNSTopics(c, namespace) } diff --git a/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/snssubscription.go b/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/snssubscription.go new file mode 100644 index 000000000..038c6cb26 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1/snssubscription.go @@ -0,0 +1,157 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/christopherhein/aws-operator/pkg/apis/operator.aws/v1alpha1" + scheme "github.com/christopherhein/aws-operator/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// SNSSubscriptionsGetter has a method to return a SNSSubscriptionInterface. +// A group's client should implement this interface. +type SNSSubscriptionsGetter interface { + SNSSubscriptions(namespace string) SNSSubscriptionInterface +} + +// SNSSubscriptionInterface has methods to work with SNSSubscription resources. +type SNSSubscriptionInterface interface { + Create(*v1alpha1.SNSSubscription) (*v1alpha1.SNSSubscription, error) + Update(*v1alpha1.SNSSubscription) (*v1alpha1.SNSSubscription, error) + Delete(name string, options *v1.DeleteOptions) error + DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error + Get(name string, options v1.GetOptions) (*v1alpha1.SNSSubscription, error) + List(opts v1.ListOptions) (*v1alpha1.SNSSubscriptionList, error) + Watch(opts v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.SNSSubscription, err error) + SNSSubscriptionExpansion +} + +// sNSSubscriptions implements SNSSubscriptionInterface +type sNSSubscriptions struct { + client rest.Interface + ns string +} + +// newSNSSubscriptions returns a SNSSubscriptions +func newSNSSubscriptions(c *OperatorV1alpha1Client, namespace string) *sNSSubscriptions { + return &sNSSubscriptions{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the sNSSubscription, and returns the corresponding sNSSubscription object, and an error if there is any. +func (c *sNSSubscriptions) Get(name string, options v1.GetOptions) (result *v1alpha1.SNSSubscription, err error) { + result = &v1alpha1.SNSSubscription{} + err = c.client.Get(). + Namespace(c.ns). + Resource("snssubscriptions"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of SNSSubscriptions that match those selectors. +func (c *sNSSubscriptions) List(opts v1.ListOptions) (result *v1alpha1.SNSSubscriptionList, err error) { + result = &v1alpha1.SNSSubscriptionList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("snssubscriptions"). + VersionedParams(&opts, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested sNSSubscriptions. +func (c *sNSSubscriptions) Watch(opts v1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("snssubscriptions"). + VersionedParams(&opts, scheme.ParameterCodec). + Watch() +} + +// Create takes the representation of a sNSSubscription and creates it. Returns the server's representation of the sNSSubscription, and an error, if there is any. +func (c *sNSSubscriptions) Create(sNSSubscription *v1alpha1.SNSSubscription) (result *v1alpha1.SNSSubscription, err error) { + result = &v1alpha1.SNSSubscription{} + err = c.client.Post(). + Namespace(c.ns). + Resource("snssubscriptions"). + Body(sNSSubscription). + Do(). + Into(result) + return +} + +// Update takes the representation of a sNSSubscription and updates it. Returns the server's representation of the sNSSubscription, and an error, if there is any. +func (c *sNSSubscriptions) Update(sNSSubscription *v1alpha1.SNSSubscription) (result *v1alpha1.SNSSubscription, err error) { + result = &v1alpha1.SNSSubscription{} + err = c.client.Put(). + Namespace(c.ns). + Resource("snssubscriptions"). + Name(sNSSubscription.Name). + Body(sNSSubscription). + Do(). + Into(result) + return +} + +// Delete takes name of the sNSSubscription and deletes it. Returns an error if one occurs. +func (c *sNSSubscriptions) Delete(name string, options *v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("snssubscriptions"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *sNSSubscriptions) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("snssubscriptions"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched sNSSubscription. +func (c *sNSSubscriptions) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.SNSSubscription, err error) { + result = &v1alpha1.SNSSubscription{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("snssubscriptions"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index 035473c6a..ff4ef2160 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -61,6 +61,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Operator().V1alpha1().ECRRepositories().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("s3buckets"): return &genericInformer{resource: resource.GroupResource(), informer: f.Operator().V1alpha1().S3Buckets().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("snssubscriptions"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Operator().V1alpha1().SNSSubscriptions().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("snstopics"): return &genericInformer{resource: resource.GroupResource(), informer: f.Operator().V1alpha1().SNSTopics().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("sqsqueues"): diff --git a/pkg/client/informers/externalversions/operator.aws/v1alpha1/interface.go b/pkg/client/informers/externalversions/operator.aws/v1alpha1/interface.go index 45860fcc2..a374b10ec 100644 --- a/pkg/client/informers/externalversions/operator.aws/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/operator.aws/v1alpha1/interface.go @@ -32,6 +32,8 @@ type Interface interface { ECRRepositories() ECRRepositoryInformer // S3Buckets returns a S3BucketInformer. S3Buckets() S3BucketInformer + // SNSSubscriptions returns a SNSSubscriptionInformer. + SNSSubscriptions() SNSSubscriptionInformer // SNSTopics returns a SNSTopicInformer. SNSTopics() SNSTopicInformer // SQSQueues returns a SQSQueueInformer. @@ -69,6 +71,11 @@ func (v *version) S3Buckets() S3BucketInformer { return &s3BucketInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// SNSSubscriptions returns a SNSSubscriptionInformer. +func (v *version) SNSSubscriptions() SNSSubscriptionInformer { + return &sNSSubscriptionInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // SNSTopics returns a SNSTopicInformer. func (v *version) SNSTopics() SNSTopicInformer { return &sNSTopicInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/client/informers/externalversions/operator.aws/v1alpha1/snssubscription.go b/pkg/client/informers/externalversions/operator.aws/v1alpha1/snssubscription.go new file mode 100644 index 000000000..b472af663 --- /dev/null +++ b/pkg/client/informers/externalversions/operator.aws/v1alpha1/snssubscription.go @@ -0,0 +1,89 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + time "time" + + operator_aws_v1alpha1 "github.com/christopherhein/aws-operator/pkg/apis/operator.aws/v1alpha1" + versioned "github.com/christopherhein/aws-operator/pkg/client/clientset/versioned" + internalinterfaces "github.com/christopherhein/aws-operator/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/christopherhein/aws-operator/pkg/client/listers/operator.aws/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// SNSSubscriptionInformer provides access to a shared informer and lister for +// SNSSubscriptions. +type SNSSubscriptionInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.SNSSubscriptionLister +} + +type sNSSubscriptionInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewSNSSubscriptionInformer constructs a new informer for SNSSubscription type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewSNSSubscriptionInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredSNSSubscriptionInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredSNSSubscriptionInformer constructs a new informer for SNSSubscription type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredSNSSubscriptionInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.OperatorV1alpha1().SNSSubscriptions(namespace).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.OperatorV1alpha1().SNSSubscriptions(namespace).Watch(options) + }, + }, + &operator_aws_v1alpha1.SNSSubscription{}, + resyncPeriod, + indexers, + ) +} + +func (f *sNSSubscriptionInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredSNSSubscriptionInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *sNSSubscriptionInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&operator_aws_v1alpha1.SNSSubscription{}, f.defaultInformer) +} + +func (f *sNSSubscriptionInformer) Lister() v1alpha1.SNSSubscriptionLister { + return v1alpha1.NewSNSSubscriptionLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/listers/operator.aws/v1alpha1/expansion_generated.go b/pkg/client/listers/operator.aws/v1alpha1/expansion_generated.go index dadd32dde..660a6225b 100644 --- a/pkg/client/listers/operator.aws/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/operator.aws/v1alpha1/expansion_generated.go @@ -50,6 +50,14 @@ type S3BucketListerExpansion interface{} // S3BucketNamespaceLister. type S3BucketNamespaceListerExpansion interface{} +// SNSSubscriptionListerExpansion allows custom methods to be added to +// SNSSubscriptionLister. +type SNSSubscriptionListerExpansion interface{} + +// SNSSubscriptionNamespaceListerExpansion allows custom methods to be added to +// SNSSubscriptionNamespaceLister. +type SNSSubscriptionNamespaceListerExpansion interface{} + // SNSTopicListerExpansion allows custom methods to be added to // SNSTopicLister. type SNSTopicListerExpansion interface{} diff --git a/pkg/client/listers/operator.aws/v1alpha1/snssubscription.go b/pkg/client/listers/operator.aws/v1alpha1/snssubscription.go new file mode 100644 index 000000000..124be8030 --- /dev/null +++ b/pkg/client/listers/operator.aws/v1alpha1/snssubscription.go @@ -0,0 +1,94 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/christopherhein/aws-operator/pkg/apis/operator.aws/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// SNSSubscriptionLister helps list SNSSubscriptions. +type SNSSubscriptionLister interface { + // List lists all SNSSubscriptions in the indexer. + List(selector labels.Selector) (ret []*v1alpha1.SNSSubscription, err error) + // SNSSubscriptions returns an object that can list and get SNSSubscriptions. + SNSSubscriptions(namespace string) SNSSubscriptionNamespaceLister + SNSSubscriptionListerExpansion +} + +// sNSSubscriptionLister implements the SNSSubscriptionLister interface. +type sNSSubscriptionLister struct { + indexer cache.Indexer +} + +// NewSNSSubscriptionLister returns a new SNSSubscriptionLister. +func NewSNSSubscriptionLister(indexer cache.Indexer) SNSSubscriptionLister { + return &sNSSubscriptionLister{indexer: indexer} +} + +// List lists all SNSSubscriptions in the indexer. +func (s *sNSSubscriptionLister) List(selector labels.Selector) (ret []*v1alpha1.SNSSubscription, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.SNSSubscription)) + }) + return ret, err +} + +// SNSSubscriptions returns an object that can list and get SNSSubscriptions. +func (s *sNSSubscriptionLister) SNSSubscriptions(namespace string) SNSSubscriptionNamespaceLister { + return sNSSubscriptionNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// SNSSubscriptionNamespaceLister helps list and get SNSSubscriptions. +type SNSSubscriptionNamespaceLister interface { + // List lists all SNSSubscriptions in the indexer for a given namespace. + List(selector labels.Selector) (ret []*v1alpha1.SNSSubscription, err error) + // Get retrieves the SNSSubscription from the indexer for a given namespace and name. + Get(name string) (*v1alpha1.SNSSubscription, error) + SNSSubscriptionNamespaceListerExpansion +} + +// sNSSubscriptionNamespaceLister implements the SNSSubscriptionNamespaceLister +// interface. +type sNSSubscriptionNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all SNSSubscriptions in the indexer for a given namespace. +func (s sNSSubscriptionNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.SNSSubscription, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.SNSSubscription)) + }) + return ret, err +} + +// Get retrieves the SNSSubscription from the indexer for a given namespace and name. +func (s sNSSubscriptionNamespaceLister) Get(name string) (*v1alpha1.SNSSubscription, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("snssubscription"), name) + } + return obj.(*v1alpha1.SNSSubscription), nil +} diff --git a/pkg/helpers/service.go b/pkg/helpers/service.go index fc8a80f12..0450d9ea9 100644 --- a/pkg/helpers/service.go +++ b/pkg/helpers/service.go @@ -8,8 +8,9 @@ import ( // Data wrapps the object that is needed for the services type Data struct { - Obj interface{} - Config *config.Config + Helpers Helpers + Obj interface{} + Config *config.Config } // CreateExternalNameService will create a Kubernetes Servic Using ExternalName types diff --git a/pkg/helpers/template_functions.go b/pkg/helpers/template_functions.go new file mode 100644 index 000000000..3f10a3929 --- /dev/null +++ b/pkg/helpers/template_functions.go @@ -0,0 +1,120 @@ +// >>>>>>> DO NOT EDIT THIS FILE <<<<<<<<<< +// This file is autogenerated via `aws-operator-codegen process` +// If you'd like the change anything about this file make edits to the .templ +// file in the pkg/codegen/assets directory. + +package helpers + +import ( + awsclient "github.com/christopherhein/aws-operator/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1" + "github.com/christopherhein/aws-operator/pkg/config" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// New Will return helpers to be passed into the templates +func New() Helpers { + return Helpers{ + GetCloudFormationTemplateByName: GetCloudFormationTemplateByName, + GetDynamoDBByName: GetDynamoDBByName, + GetECRRepositoryByName: GetECRRepositoryByName, + GetS3BucketByName: GetS3BucketByName, + GetSNSSubscriptionByName: GetSNSSubscriptionByName, + GetSNSTopicByName: GetSNSTopicByName, + GetSQSQueueByName: GetSQSQueueByName, + } +} + +// Helpers defines all the Helper functions that are exposed to the templates +type Helpers struct { + GetCloudFormationTemplateByName func(*config.Config, string, string) (interface{}, error) + GetDynamoDBByName func(*config.Config, string, string) (interface{}, error) + GetECRRepositoryByName func(*config.Config, string, string) (interface{}, error) + GetS3BucketByName func(*config.Config, string, string) (interface{}, error) + GetSNSSubscriptionByName func(*config.Config, string, string) (interface{}, error) + GetSNSTopicByName func(*config.Config, string, string) (interface{}, error) + GetSQSQueueByName func(*config.Config, string, string) (interface{}, error) +} +// GetCloudFormationTemplateByName will find the resource by name +func GetCloudFormationTemplateByName(config *config.Config, name string, namespace string) (interface{}, error) { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.CloudFormationTemplates(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting sns topic") + return "", err + } + + return resource, nil +} +// GetDynamoDBByName will find the resource by name +func GetDynamoDBByName(config *config.Config, name string, namespace string) (interface{}, error) { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.DynamoDBs(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting sns topic") + return "", err + } + + return resource, nil +} +// GetECRRepositoryByName will find the resource by name +func GetECRRepositoryByName(config *config.Config, name string, namespace string) (interface{}, error) { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.ECRRepositories(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting sns topic") + return "", err + } + + return resource, nil +} +// GetS3BucketByName will find the resource by name +func GetS3BucketByName(config *config.Config, name string, namespace string) (interface{}, error) { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.S3Buckets(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting sns topic") + return "", err + } + + return resource, nil +} +// GetSNSSubscriptionByName will find the resource by name +func GetSNSSubscriptionByName(config *config.Config, name string, namespace string) (interface{}, error) { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.SNSSubscriptions(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting sns topic") + return "", err + } + + return resource, nil +} +// GetSNSTopicByName will find the resource by name +func GetSNSTopicByName(config *config.Config, name string, namespace string) (interface{}, error) { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.SNSTopics(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting sns topic") + return "", err + } + + return resource, nil +} +// GetSQSQueueByName will find the resource by name +func GetSQSQueueByName(config *config.Config, name string, namespace string) (interface{}, error) { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.SQSQueues(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting sns topic") + return "", err + } + + return resource, nil +} diff --git a/pkg/operator/cloudformationtemplate/controller.go b/pkg/operator/cloudformationtemplate/controller.go index 1307ed0d2..1ffd1c5eb 100644 --- a/pkg/operator/cloudformationtemplate/controller.go +++ b/pkg/operator/cloudformationtemplate/controller.go @@ -76,6 +76,10 @@ func (c *Controller) onAdd(obj interface{}) { func (c *Controller) onUpdate(oldObj, newObj interface{}) { oo := oldObj.(*awsV1alpha1.CloudFormationTemplate).DeepCopy() no := newObj.(*awsV1alpha1.CloudFormationTemplate).DeepCopy() + + if no.Status.ResourceStatus == "DELETE_COMPLETE" { + c.onAdd(no) + } cloudformationtemplate.OnUpdate(c.config, oo, no) } diff --git a/pkg/operator/dynamodb/cft.go b/pkg/operator/dynamodb/cft.go index 733c8b7db..b57196a63 100644 --- a/pkg/operator/dynamodb/cft.go +++ b/pkg/operator/dynamodb/cft.go @@ -30,6 +30,11 @@ type Cloudformation struct { topicARN string } +// StackName returns the name of the stack based on the aws-operator-config +func (s *Cloudformation) StackName() string { + return helpers.StackName(s.config.ClusterName, "dynamodb", s.DynamoDB.Name, s.DynamoDB.Namespace) +} + // GetOutputs return the stack outputs from the DescribeStacks call func (s *Cloudformation) GetOutputs() (map[string]string, error) { outputs := map[string]string{} @@ -37,7 +42,7 @@ func (s *Cloudformation) GetOutputs() (map[string]string, error) { svc := cloudformation.New(sess) stackInputs := cloudformation.DescribeStacksInput{ - StackName: aws.String(s.DynamoDB.Name), + StackName: aws.String(s.StackName()), } output, err := svc.DescribeStacks(&stackInputs) @@ -64,7 +69,7 @@ func (s *Cloudformation) CreateStack() (output *cloudformation.CreateStackOutput cftemplate := helpers.GetCloudFormationTemplate(s.config, "dynamodb", s.DynamoDB.Spec.CloudFormationTemplateName, s.DynamoDB.Spec.CloudFormationTemplateNamespace) stackInputs := cloudformation.CreateStackInput{ - StackName: aws.String(s.DynamoDB.Name), + StackName: aws.String(s.StackName()), TemplateURL: aws.String(cftemplate), NotificationARNs: []*string{ aws.String(s.topicARN), @@ -76,12 +81,42 @@ func (s *Cloudformation) CreateStack() (output *cloudformation.CreateStackOutput namespace := helpers.CreateParam("Namespace", s.DynamoDB.Namespace) clusterName := helpers.CreateParam("ClusterName", s.config.ClusterName) tableName := helpers.CreateParam("TableName", helpers.Stringify(s.DynamoDB.Name)) - rangeAttributeName := helpers.CreateParam("RangeAttributeName", helpers.Stringify(s.DynamoDB.Spec.RangeAttribute.Name)) - rangeAttributeType := helpers.CreateParam("RangeAttributeType", helpers.Stringify(s.DynamoDB.Spec.RangeAttribute.Type)) - readCapacityUnits := helpers.CreateParam("ReadCapacityUnits", helpers.Stringify(s.DynamoDB.Spec.ReadCapacityUnits)) - writeCapacityUnits := helpers.CreateParam("WriteCapacityUnits", helpers.Stringify(s.DynamoDB.Spec.WriteCapacityUnits)) - hashAttributeName := helpers.CreateParam("HashAttributeName", helpers.Stringify(s.DynamoDB.Spec.HashAttribute.Name)) - hashAttributeType := helpers.CreateParam("HashAttributeType", helpers.Stringify(s.DynamoDB.Spec.HashAttribute.Type)) + rangeAttributeNameTemp := "{{.Obj.Spec.RangeAttribute.Name}}" + rangeAttributeNameValue, err := helpers.Templatize(rangeAttributeNameTemp, helpers.Data{Obj: s.DynamoDB, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + rangeAttributeName := helpers.CreateParam("RangeAttributeName", helpers.Stringify(rangeAttributeNameValue)) + rangeAttributeTypeTemp := "{{.Obj.Spec.RangeAttribute.Type}}" + rangeAttributeTypeValue, err := helpers.Templatize(rangeAttributeTypeTemp, helpers.Data{Obj: s.DynamoDB, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + rangeAttributeType := helpers.CreateParam("RangeAttributeType", helpers.Stringify(rangeAttributeTypeValue)) + readCapacityUnitsTemp := "{{.Obj.Spec.ReadCapacityUnits}}" + readCapacityUnitsValue, err := helpers.Templatize(readCapacityUnitsTemp, helpers.Data{Obj: s.DynamoDB, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + readCapacityUnits := helpers.CreateParam("ReadCapacityUnits", helpers.Stringify(readCapacityUnitsValue)) + writeCapacityUnitsTemp := "{{.Obj.Spec.WriteCapacityUnits}}" + writeCapacityUnitsValue, err := helpers.Templatize(writeCapacityUnitsTemp, helpers.Data{Obj: s.DynamoDB, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + writeCapacityUnits := helpers.CreateParam("WriteCapacityUnits", helpers.Stringify(writeCapacityUnitsValue)) + hashAttributeNameTemp := "{{.Obj.Spec.HashAttribute.Name}}" + hashAttributeNameValue, err := helpers.Templatize(hashAttributeNameTemp, helpers.Data{Obj: s.DynamoDB, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + hashAttributeName := helpers.CreateParam("HashAttributeName", helpers.Stringify(hashAttributeNameValue)) + hashAttributeTypeTemp := "{{.Obj.Spec.HashAttribute.Type}}" + hashAttributeTypeValue, err := helpers.Templatize(hashAttributeTypeTemp, helpers.Data{Obj: s.DynamoDB, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + hashAttributeType := helpers.CreateParam("HashAttributeType", helpers.Stringify(hashAttributeTypeValue)) parameters := []*cloudformation.Parameter{} parameters = append(parameters, resourceName) @@ -123,7 +158,7 @@ func (s *Cloudformation) UpdateStack(updated *awsV1alpha1.DynamoDB) (output *clo cftemplate := helpers.GetCloudFormationTemplate(s.config, "dynamodb", updated.Spec.CloudFormationTemplateName, updated.Spec.CloudFormationTemplateNamespace) stackInputs := cloudformation.UpdateStackInput{ - StackName: aws.String(s.DynamoDB.Name), + StackName: aws.String(s.StackName()), TemplateURL: aws.String(cftemplate), NotificationARNs: []*string{ aws.String(s.topicARN), @@ -135,12 +170,42 @@ func (s *Cloudformation) UpdateStack(updated *awsV1alpha1.DynamoDB) (output *clo namespace := helpers.CreateParam("Namespace", s.DynamoDB.Namespace) clusterName := helpers.CreateParam("ClusterName", s.config.ClusterName) tableName := helpers.CreateParam("TableName", helpers.Stringify(s.DynamoDB.Name)) - rangeAttributeName := helpers.CreateParam("RangeAttributeName", helpers.Stringify(updated.Spec.RangeAttribute.Name)) - rangeAttributeType := helpers.CreateParam("RangeAttributeType", helpers.Stringify(updated.Spec.RangeAttribute.Type)) - readCapacityUnits := helpers.CreateParam("ReadCapacityUnits", helpers.Stringify(updated.Spec.ReadCapacityUnits)) - writeCapacityUnits := helpers.CreateParam("WriteCapacityUnits", helpers.Stringify(updated.Spec.WriteCapacityUnits)) - hashAttributeName := helpers.CreateParam("HashAttributeName", helpers.Stringify(updated.Spec.HashAttribute.Name)) - hashAttributeType := helpers.CreateParam("HashAttributeType", helpers.Stringify(updated.Spec.HashAttribute.Type)) + rangeAttributeNameTemp := "{{.Obj.Spec.RangeAttribute.Name}}" + rangeAttributeNameValue, err := helpers.Templatize(rangeAttributeNameTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + rangeAttributeName := helpers.CreateParam("RangeAttributeName", helpers.Stringify(rangeAttributeNameValue)) + rangeAttributeTypeTemp := "{{.Obj.Spec.RangeAttribute.Type}}" + rangeAttributeTypeValue, err := helpers.Templatize(rangeAttributeTypeTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + rangeAttributeType := helpers.CreateParam("RangeAttributeType", helpers.Stringify(rangeAttributeTypeValue)) + readCapacityUnitsTemp := "{{.Obj.Spec.ReadCapacityUnits}}" + readCapacityUnitsValue, err := helpers.Templatize(readCapacityUnitsTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + readCapacityUnits := helpers.CreateParam("ReadCapacityUnits", helpers.Stringify(readCapacityUnitsValue)) + writeCapacityUnitsTemp := "{{.Obj.Spec.WriteCapacityUnits}}" + writeCapacityUnitsValue, err := helpers.Templatize(writeCapacityUnitsTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + writeCapacityUnits := helpers.CreateParam("WriteCapacityUnits", helpers.Stringify(writeCapacityUnitsValue)) + hashAttributeNameTemp := "{{.Obj.Spec.HashAttribute.Name}}" + hashAttributeNameValue, err := helpers.Templatize(hashAttributeNameTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + hashAttributeName := helpers.CreateParam("HashAttributeName", helpers.Stringify(hashAttributeNameValue)) + hashAttributeTypeTemp := "{{.Obj.Spec.HashAttribute.Type}}" + hashAttributeTypeValue, err := helpers.Templatize(hashAttributeTypeTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + hashAttributeType := helpers.CreateParam("HashAttributeType", helpers.Stringify(hashAttributeTypeValue)) parameters := []*cloudformation.Parameter{} parameters = append(parameters, resourceName) @@ -180,8 +245,21 @@ func (s *Cloudformation) DeleteStack() (err error) { svc := cloudformation.New(sess) stackInputs := cloudformation.DeleteStackInput{} - stackInputs.SetStackName(s.DynamoDB.Name) + stackInputs.SetStackName(s.StackName()) _, err = svc.DeleteStack(&stackInputs) return } + +// WaitUntilStackDeleted will delete the stack +func (s *Cloudformation) WaitUntilStackDeleted() (err error) { + sess := s.config.AWSSession + svc := cloudformation.New(sess) + + stackInputs := cloudformation.DescribeStacksInput{ + StackName: aws.String(s.StackName()), + } + + err = svc.WaitUntilStackDeleteComplete(&stackInputs) + return +} diff --git a/pkg/operator/dynamodb/controller.go b/pkg/operator/dynamodb/controller.go index 751b8f5cf..59e0207cc 100644 --- a/pkg/operator/dynamodb/controller.go +++ b/pkg/operator/dynamodb/controller.go @@ -95,9 +95,26 @@ func QueueUpdater(config *config.Config, msg *queue.MessageBody) error { } if name != "" && namespace != "" { - err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) - if err != nil { - return err + if msg.ParsedMessage["ResourceStatus"] == "ROLLBACK_COMPLETE" { + err := deleteStack(config, name, namespace, msg.ParsedMessage["StackId"]) + if err != nil { + return err + } + } else if msg.ParsedMessage["ResourceStatus"] == "DELETE_COMPLETE" { + err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) + if err != nil { + return err + } + + err = incrementRollbackCount(config, name, namespace) + if err != nil { + return err + } + } else { + err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) + if err != nil { + return err + } } } @@ -106,7 +123,7 @@ func QueueUpdater(config *config.Config, msg *queue.MessageBody) error { func (c *Controller) onAdd(obj interface{}) { s := obj.(*awsV1alpha1.DynamoDB).DeepCopy() - if s.Status.ResourceStatus == "" { + if s.Status.ResourceStatus == "" || s.Status.ResourceStatus == "DELETE_COMPLETE" { cft := New(c.config, s, c.topicARN) output, err := cft.CreateStack() if err != nil { @@ -126,7 +143,11 @@ func (c *Controller) onAdd(obj interface{}) { func (c *Controller) onUpdate(oldObj, newObj interface{}) { oo := oldObj.(*awsV1alpha1.DynamoDB).DeepCopy() no := newObj.(*awsV1alpha1.DynamoDB).DeepCopy() - if helpers.IsStackComplete(oo.Status.ResourceStatus, false) { + + if no.Status.ResourceStatus == "DELETE_COMPLETE" { + c.onAdd(no) + } + if helpers.IsStackComplete(oo.Status.ResourceStatus, false) && !reflect.DeepEqual(oo.Spec, no.Spec) { cft := New(c.config, oo, c.topicARN) output, err := cft.UpdateStack(no) if err != nil { @@ -154,41 +175,80 @@ func (c *Controller) onDelete(obj interface{}) { c.config.Logger.Infof("deleted dynamodb '%s'", s.Name) } +func incrementRollbackCount(config *config.Config, name string, namespace string) error { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.DynamoDBs(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting dynamodbs") + return err + } + + resourceCopy := resource.DeepCopy() + resourceCopy.Spec.RollbackCount = resourceCopy.Spec.RollbackCount+1 + + _, err = clientSet.DynamoDBs(namespace).Update(resourceCopy) + if err != nil { + logger.WithError(err).Error("error updating resource") + return err + } + return nil +} + func updateStatus(config *config.Config, name string, namespace string, stackID string, status string, reason string) error { - logger := config.Logger - clientSet, _ := awsclient.NewForConfig(config.RESTConfig) - resource, err := clientSet.DynamoDBs(namespace).Get(name, metav1.GetOptions{}) + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.DynamoDBs(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting dynamodbs") + return err + } + + resourceCopy := resource.DeepCopy() + resourceCopy.Status.ResourceStatus = status + resourceCopy.Status.ResourceStatusReason = reason + resourceCopy.Status.StackID = stackID + + if helpers.IsStackComplete(status, false) { + cft := New(config, resourceCopy, "") + outputs, err := cft.GetOutputs() if err != nil { - logger.WithError(err).Error("error getting dynamodbs") - return err + logger.WithError(err).Error("error getting outputs") } + resourceCopy.Output.TableName = outputs["TableName"] + resourceCopy.Output.TableARN = outputs["TableArn"] + } - resourceCopy := resource.DeepCopy() - resourceCopy.Status.ResourceStatus = status - resourceCopy.Status.ResourceStatusReason = reason - resourceCopy.Status.StackID = stackID + _, err = clientSet.DynamoDBs(namespace).Update(resourceCopy) + if err != nil { + logger.WithError(err).Error("error updating resource") + return err + } - if helpers.IsStackComplete(status, true) { - cft := New(config, resourceCopy, "") - outputs, err := cft.GetOutputs() - if err != nil { - logger.WithError(err).Error("error getting outputs") - } - resourceCopy.Output.TableName = outputs["TableName"] - resourceCopy.Output.TableARN = outputs["TableArn"] - } + err = syncAdditionalResources(config, resourceCopy) + if err != nil { + logger.WithError(err).Info("error syncing resources") + } + return nil +} - _, err = clientSet.DynamoDBs(namespace).Update(resourceCopy) - if err != nil { - logger.WithError(err).Error("error updating resource") - return err - } +func deleteStack(config *config.Config, name string, namespace string, stackID string) error { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.DynamoDBs(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting dynamodbs") + return err + } - err = syncAdditionalResources(config, resourceCopy) - if err != nil { - logger.WithError(err).Info("error syncing resources") - } - return nil + cft := New(config, resource, "") + err = cft.DeleteStack() + if err != nil { + return err + } + + err = cft.WaitUntilStackDeleted() + return err } func syncAdditionalResources(config *config.Config, s *awsV1alpha1.DynamoDB) (err error) { diff --git a/pkg/operator/ecrrepository/cft.go b/pkg/operator/ecrrepository/cft.go index 1c753273b..04704f454 100644 --- a/pkg/operator/ecrrepository/cft.go +++ b/pkg/operator/ecrrepository/cft.go @@ -30,6 +30,11 @@ type Cloudformation struct { topicARN string } +// StackName returns the name of the stack based on the aws-operator-config +func (s *Cloudformation) StackName() string { + return helpers.StackName(s.config.ClusterName, "ecrrepository", s.ECRRepository.Name, s.ECRRepository.Namespace) +} + // GetOutputs return the stack outputs from the DescribeStacks call func (s *Cloudformation) GetOutputs() (map[string]string, error) { outputs := map[string]string{} @@ -37,7 +42,7 @@ func (s *Cloudformation) GetOutputs() (map[string]string, error) { svc := cloudformation.New(sess) stackInputs := cloudformation.DescribeStacksInput{ - StackName: aws.String(s.ECRRepository.Name), + StackName: aws.String(s.StackName()), } output, err := svc.DescribeStacks(&stackInputs) @@ -64,7 +69,7 @@ func (s *Cloudformation) CreateStack() (output *cloudformation.CreateStackOutput cftemplate := helpers.GetCloudFormationTemplate(s.config, "ecrrepository", s.ECRRepository.Spec.CloudFormationTemplateName, s.ECRRepository.Spec.CloudFormationTemplateNamespace) stackInputs := cloudformation.CreateStackInput{ - StackName: aws.String(s.ECRRepository.Name), + StackName: aws.String(s.StackName()), TemplateURL: aws.String(cftemplate), NotificationARNs: []*string{ aws.String(s.topicARN), @@ -111,7 +116,7 @@ func (s *Cloudformation) UpdateStack(updated *awsV1alpha1.ECRRepository) (output cftemplate := helpers.GetCloudFormationTemplate(s.config, "ecrrepository", updated.Spec.CloudFormationTemplateName, updated.Spec.CloudFormationTemplateNamespace) stackInputs := cloudformation.UpdateStackInput{ - StackName: aws.String(s.ECRRepository.Name), + StackName: aws.String(s.StackName()), TemplateURL: aws.String(cftemplate), NotificationARNs: []*string{ aws.String(s.topicARN), @@ -156,8 +161,21 @@ func (s *Cloudformation) DeleteStack() (err error) { svc := cloudformation.New(sess) stackInputs := cloudformation.DeleteStackInput{} - stackInputs.SetStackName(s.ECRRepository.Name) + stackInputs.SetStackName(s.StackName()) _, err = svc.DeleteStack(&stackInputs) return } + +// WaitUntilStackDeleted will delete the stack +func (s *Cloudformation) WaitUntilStackDeleted() (err error) { + sess := s.config.AWSSession + svc := cloudformation.New(sess) + + stackInputs := cloudformation.DescribeStacksInput{ + StackName: aws.String(s.StackName()), + } + + err = svc.WaitUntilStackDeleteComplete(&stackInputs) + return +} diff --git a/pkg/operator/ecrrepository/controller.go b/pkg/operator/ecrrepository/controller.go index 13e096c25..6da159640 100644 --- a/pkg/operator/ecrrepository/controller.go +++ b/pkg/operator/ecrrepository/controller.go @@ -92,9 +92,26 @@ func QueueUpdater(config *config.Config, msg *queue.MessageBody) error { } if name != "" && namespace != "" { - err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) - if err != nil { - return err + if msg.ParsedMessage["ResourceStatus"] == "ROLLBACK_COMPLETE" { + err := deleteStack(config, name, namespace, msg.ParsedMessage["StackId"]) + if err != nil { + return err + } + } else if msg.ParsedMessage["ResourceStatus"] == "DELETE_COMPLETE" { + err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) + if err != nil { + return err + } + + err = incrementRollbackCount(config, name, namespace) + if err != nil { + return err + } + } else { + err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) + if err != nil { + return err + } } } @@ -103,7 +120,7 @@ func QueueUpdater(config *config.Config, msg *queue.MessageBody) error { func (c *Controller) onAdd(obj interface{}) { s := obj.(*awsV1alpha1.ECRRepository).DeepCopy() - if s.Status.ResourceStatus == "" { + if s.Status.ResourceStatus == "" || s.Status.ResourceStatus == "DELETE_COMPLETE" { cft := New(c.config, s, c.topicARN) output, err := cft.CreateStack() if err != nil { @@ -123,7 +140,11 @@ func (c *Controller) onAdd(obj interface{}) { func (c *Controller) onUpdate(oldObj, newObj interface{}) { oo := oldObj.(*awsV1alpha1.ECRRepository).DeepCopy() no := newObj.(*awsV1alpha1.ECRRepository).DeepCopy() - if helpers.IsStackComplete(oo.Status.ResourceStatus, false) { + + if no.Status.ResourceStatus == "DELETE_COMPLETE" { + c.onAdd(no) + } + if helpers.IsStackComplete(oo.Status.ResourceStatus, false) && !reflect.DeepEqual(oo.Spec, no.Spec) { cft := New(c.config, oo, c.topicARN) output, err := cft.UpdateStack(no) if err != nil { @@ -151,43 +172,82 @@ func (c *Controller) onDelete(obj interface{}) { c.config.Logger.Infof("deleted ecrrepository '%s'", s.Name) } +func incrementRollbackCount(config *config.Config, name string, namespace string) error { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.ECRRepositories(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting ecrrepositories") + return err + } + + resourceCopy := resource.DeepCopy() + resourceCopy.Spec.RollbackCount = resourceCopy.Spec.RollbackCount+1 + + _, err = clientSet.ECRRepositories(namespace).Update(resourceCopy) + if err != nil { + logger.WithError(err).Error("error updating resource") + return err + } + return nil +} + func updateStatus(config *config.Config, name string, namespace string, stackID string, status string, reason string) error { - logger := config.Logger - clientSet, _ := awsclient.NewForConfig(config.RESTConfig) - resource, err := clientSet.ECRRepositories(namespace).Get(name, metav1.GetOptions{}) + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.ECRRepositories(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting ecrrepositories") + return err + } + + resourceCopy := resource.DeepCopy() + resourceCopy.Status.ResourceStatus = status + resourceCopy.Status.ResourceStatusReason = reason + resourceCopy.Status.StackID = stackID + + if helpers.IsStackComplete(status, false) { + cft := New(config, resourceCopy, "") + outputs, err := cft.GetOutputs() if err != nil { - logger.WithError(err).Error("error getting ecrrepositories") - return err + logger.WithError(err).Error("error getting outputs") } + resourceCopy.Output.RepositoryName = outputs["RepositoryName"] + resourceCopy.Output.RepositoryARN = outputs["RepositoryARN"] + repositoryURL, _ := helpers.Templatize("{{.Config.AccountID}}.dkr.ecr.{{.Config.Region}}.amazonaws.com/{{.Obj.Name}}", helpers.Data{Obj: resourceCopy, Config: config}) + resourceCopy.Output.RepositoryURL = repositoryURL + } - resourceCopy := resource.DeepCopy() - resourceCopy.Status.ResourceStatus = status - resourceCopy.Status.ResourceStatusReason = reason - resourceCopy.Status.StackID = stackID + _, err = clientSet.ECRRepositories(namespace).Update(resourceCopy) + if err != nil { + logger.WithError(err).Error("error updating resource") + return err + } - if helpers.IsStackComplete(status, true) { - cft := New(config, resourceCopy, "") - outputs, err := cft.GetOutputs() - if err != nil { - logger.WithError(err).Error("error getting outputs") - } - resourceCopy.Output.RepositoryName = outputs["RepositoryName"] - resourceCopy.Output.RepositoryARN = outputs["RepositoryARN"] - repositoryURL, _ := helpers.Templatize("{{.Config.AccountID}}.dkr.ecr.{{.Config.Region}}.amazonaws.com/{{.Obj.Name}}", helpers.Data{Obj: resourceCopy, Config: config}) - resourceCopy.Output.RepositoryURL = repositoryURL - } + err = syncAdditionalResources(config, resourceCopy) + if err != nil { + logger.WithError(err).Info("error syncing resources") + } + return nil +} - _, err = clientSet.ECRRepositories(namespace).Update(resourceCopy) - if err != nil { - logger.WithError(err).Error("error updating resource") - return err - } +func deleteStack(config *config.Config, name string, namespace string, stackID string) error { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.ECRRepositories(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting ecrrepositories") + return err + } - err = syncAdditionalResources(config, resourceCopy) - if err != nil { - logger.WithError(err).Info("error syncing resources") - } - return nil + cft := New(config, resource, "") + err = cft.DeleteStack() + if err != nil { + return err + } + + err = cft.WaitUntilStackDeleted() + return err } func syncAdditionalResources(config *config.Config, s *awsV1alpha1.ECRRepository) (err error) { diff --git a/pkg/operator/s3bucket/cft.go b/pkg/operator/s3bucket/cft.go index 96161f08a..2550b05db 100644 --- a/pkg/operator/s3bucket/cft.go +++ b/pkg/operator/s3bucket/cft.go @@ -30,6 +30,11 @@ type Cloudformation struct { topicARN string } +// StackName returns the name of the stack based on the aws-operator-config +func (s *Cloudformation) StackName() string { + return helpers.StackName(s.config.ClusterName, "s3bucket", s.S3Bucket.Name, s.S3Bucket.Namespace) +} + // GetOutputs return the stack outputs from the DescribeStacks call func (s *Cloudformation) GetOutputs() (map[string]string, error) { outputs := map[string]string{} @@ -37,7 +42,7 @@ func (s *Cloudformation) GetOutputs() (map[string]string, error) { svc := cloudformation.New(sess) stackInputs := cloudformation.DescribeStacksInput{ - StackName: aws.String(s.S3Bucket.Name), + StackName: aws.String(s.StackName()), } output, err := svc.DescribeStacks(&stackInputs) @@ -64,7 +69,7 @@ func (s *Cloudformation) CreateStack() (output *cloudformation.CreateStackOutput cftemplate := helpers.GetCloudFormationTemplate(s.config, "s3bucket", s.S3Bucket.Spec.CloudFormationTemplateName, s.S3Bucket.Spec.CloudFormationTemplateNamespace) stackInputs := cloudformation.CreateStackInput{ - StackName: aws.String(s.S3Bucket.Name), + StackName: aws.String(s.StackName()), TemplateURL: aws.String(cftemplate), NotificationARNs: []*string{ aws.String(s.topicARN), @@ -76,9 +81,30 @@ func (s *Cloudformation) CreateStack() (output *cloudformation.CreateStackOutput namespace := helpers.CreateParam("Namespace", s.S3Bucket.Namespace) clusterName := helpers.CreateParam("ClusterName", s.config.ClusterName) bucketName := helpers.CreateParam("BucketName", helpers.Stringify(s.S3Bucket.Name)) - versioning := helpers.CreateParam("EnableVersioning", helpers.Stringify(s.S3Bucket.Spec.Versioning)) - loggingenabled := helpers.CreateParam("EnableLogging", helpers.Stringify(s.S3Bucket.Spec.Logging.Enabled)) - loggingprefix := helpers.CreateParam("LoggingPrefix", helpers.Stringify(s.S3Bucket.Spec.Logging.Prefix)) + versioningTemp := "{{.Obj.Spec.Versioning}}" + versioningValue, err := helpers.Templatize(versioningTemp, helpers.Data{Obj: s.S3Bucket, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + versioning := helpers.CreateParam("EnableVersioning", helpers.Stringify(versioningValue)) + accessControlTemp := "{{.Obj.Spec.AccessControl}}" + accessControlValue, err := helpers.Templatize(accessControlTemp, helpers.Data{Obj: s.S3Bucket, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + accessControl := helpers.CreateParam("BucketAccessControl", helpers.Stringify(accessControlValue)) + loggingenabledTemp := "{{.Obj.Spec.Logging.Enabled}}" + loggingenabledValue, err := helpers.Templatize(loggingenabledTemp, helpers.Data{Obj: s.S3Bucket, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + loggingenabled := helpers.CreateParam("EnableLogging", helpers.Stringify(loggingenabledValue)) + loggingprefixTemp := "{{.Obj.Spec.Logging.Prefix}}" + loggingprefixValue, err := helpers.Templatize(loggingprefixTemp, helpers.Data{Obj: s.S3Bucket, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + loggingprefix := helpers.CreateParam("LoggingPrefix", helpers.Stringify(loggingprefixValue)) parameters := []*cloudformation.Parameter{} parameters = append(parameters, resourceName) @@ -87,6 +113,7 @@ func (s *Cloudformation) CreateStack() (output *cloudformation.CreateStackOutput parameters = append(parameters, clusterName) parameters = append(parameters, bucketName) parameters = append(parameters, versioning) + parameters = append(parameters, accessControl) parameters = append(parameters, loggingenabled) parameters = append(parameters, loggingprefix) @@ -117,7 +144,7 @@ func (s *Cloudformation) UpdateStack(updated *awsV1alpha1.S3Bucket) (output *clo cftemplate := helpers.GetCloudFormationTemplate(s.config, "s3bucket", updated.Spec.CloudFormationTemplateName, updated.Spec.CloudFormationTemplateNamespace) stackInputs := cloudformation.UpdateStackInput{ - StackName: aws.String(s.S3Bucket.Name), + StackName: aws.String(s.StackName()), TemplateURL: aws.String(cftemplate), NotificationARNs: []*string{ aws.String(s.topicARN), @@ -129,9 +156,30 @@ func (s *Cloudformation) UpdateStack(updated *awsV1alpha1.S3Bucket) (output *clo namespace := helpers.CreateParam("Namespace", s.S3Bucket.Namespace) clusterName := helpers.CreateParam("ClusterName", s.config.ClusterName) bucketName := helpers.CreateParam("BucketName", helpers.Stringify(s.S3Bucket.Name)) - versioning := helpers.CreateParam("EnableVersioning", helpers.Stringify(updated.Spec.Versioning)) - loggingenabled := helpers.CreateParam("EnableLogging", helpers.Stringify(updated.Spec.Logging.Enabled)) - loggingprefix := helpers.CreateParam("LoggingPrefix", helpers.Stringify(updated.Spec.Logging.Prefix)) + versioningTemp := "{{.Obj.Spec.Versioning}}" + versioningValue, err := helpers.Templatize(versioningTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + versioning := helpers.CreateParam("EnableVersioning", helpers.Stringify(versioningValue)) + accessControlTemp := "{{.Obj.Spec.AccessControl}}" + accessControlValue, err := helpers.Templatize(accessControlTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + accessControl := helpers.CreateParam("BucketAccessControl", helpers.Stringify(accessControlValue)) + loggingenabledTemp := "{{.Obj.Spec.Logging.Enabled}}" + loggingenabledValue, err := helpers.Templatize(loggingenabledTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + loggingenabled := helpers.CreateParam("EnableLogging", helpers.Stringify(loggingenabledValue)) + loggingprefixTemp := "{{.Obj.Spec.Logging.Prefix}}" + loggingprefixValue, err := helpers.Templatize(loggingprefixTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + loggingprefix := helpers.CreateParam("LoggingPrefix", helpers.Stringify(loggingprefixValue)) parameters := []*cloudformation.Parameter{} parameters = append(parameters, resourceName) @@ -140,6 +188,7 @@ func (s *Cloudformation) UpdateStack(updated *awsV1alpha1.S3Bucket) (output *clo parameters = append(parameters, clusterName) parameters = append(parameters, bucketName) parameters = append(parameters, versioning) + parameters = append(parameters, accessControl) parameters = append(parameters, loggingenabled) parameters = append(parameters, loggingprefix) @@ -168,8 +217,21 @@ func (s *Cloudformation) DeleteStack() (err error) { svc := cloudformation.New(sess) stackInputs := cloudformation.DeleteStackInput{} - stackInputs.SetStackName(s.S3Bucket.Name) + stackInputs.SetStackName(s.StackName()) _, err = svc.DeleteStack(&stackInputs) return } + +// WaitUntilStackDeleted will delete the stack +func (s *Cloudformation) WaitUntilStackDeleted() (err error) { + sess := s.config.AWSSession + svc := cloudformation.New(sess) + + stackInputs := cloudformation.DescribeStacksInput{ + StackName: aws.String(s.StackName()), + } + + err = svc.WaitUntilStackDeleteComplete(&stackInputs) + return +} diff --git a/pkg/operator/s3bucket/controller.go b/pkg/operator/s3bucket/controller.go index e459fdb27..8c0175cc0 100644 --- a/pkg/operator/s3bucket/controller.go +++ b/pkg/operator/s3bucket/controller.go @@ -93,9 +93,26 @@ func QueueUpdater(config *config.Config, msg *queue.MessageBody) error { } if name != "" && namespace != "" { - err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) - if err != nil { - return err + if msg.ParsedMessage["ResourceStatus"] == "ROLLBACK_COMPLETE" { + err := deleteStack(config, name, namespace, msg.ParsedMessage["StackId"]) + if err != nil { + return err + } + } else if msg.ParsedMessage["ResourceStatus"] == "DELETE_COMPLETE" { + err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) + if err != nil { + return err + } + + err = incrementRollbackCount(config, name, namespace) + if err != nil { + return err + } + } else { + err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) + if err != nil { + return err + } } } @@ -104,7 +121,7 @@ func QueueUpdater(config *config.Config, msg *queue.MessageBody) error { func (c *Controller) onAdd(obj interface{}) { s := obj.(*awsV1alpha1.S3Bucket).DeepCopy() - if s.Status.ResourceStatus == "" { + if s.Status.ResourceStatus == "" || s.Status.ResourceStatus == "DELETE_COMPLETE" { cft := New(c.config, s, c.topicARN) output, err := cft.CreateStack() if err != nil { @@ -124,7 +141,11 @@ func (c *Controller) onAdd(obj interface{}) { func (c *Controller) onUpdate(oldObj, newObj interface{}) { oo := oldObj.(*awsV1alpha1.S3Bucket).DeepCopy() no := newObj.(*awsV1alpha1.S3Bucket).DeepCopy() - if helpers.IsStackComplete(oo.Status.ResourceStatus, false) { + + if no.Status.ResourceStatus == "DELETE_COMPLETE" { + c.onAdd(no) + } + if helpers.IsStackComplete(oo.Status.ResourceStatus, false) && !reflect.DeepEqual(oo.Spec, no.Spec) { cft := New(c.config, oo, c.topicARN) output, err := cft.UpdateStack(no) if err != nil { @@ -152,41 +173,80 @@ func (c *Controller) onDelete(obj interface{}) { c.config.Logger.Infof("deleted s3bucket '%s'", s.Name) } +func incrementRollbackCount(config *config.Config, name string, namespace string) error { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.S3Buckets(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting s3buckets") + return err + } + + resourceCopy := resource.DeepCopy() + resourceCopy.Spec.RollbackCount = resourceCopy.Spec.RollbackCount+1 + + _, err = clientSet.S3Buckets(namespace).Update(resourceCopy) + if err != nil { + logger.WithError(err).Error("error updating resource") + return err + } + return nil +} + func updateStatus(config *config.Config, name string, namespace string, stackID string, status string, reason string) error { - logger := config.Logger - clientSet, _ := awsclient.NewForConfig(config.RESTConfig) - resource, err := clientSet.S3Buckets(namespace).Get(name, metav1.GetOptions{}) + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.S3Buckets(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting s3buckets") + return err + } + + resourceCopy := resource.DeepCopy() + resourceCopy.Status.ResourceStatus = status + resourceCopy.Status.ResourceStatusReason = reason + resourceCopy.Status.StackID = stackID + + if helpers.IsStackComplete(status, false) { + cft := New(config, resourceCopy, "") + outputs, err := cft.GetOutputs() if err != nil { - logger.WithError(err).Error("error getting s3buckets") - return err + logger.WithError(err).Error("error getting outputs") } + resourceCopy.Output.BucketName = outputs["BucketName"] + resourceCopy.Output.BucketARN = outputs["BucketArn"] + } - resourceCopy := resource.DeepCopy() - resourceCopy.Status.ResourceStatus = status - resourceCopy.Status.ResourceStatusReason = reason - resourceCopy.Status.StackID = stackID + _, err = clientSet.S3Buckets(namespace).Update(resourceCopy) + if err != nil { + logger.WithError(err).Error("error updating resource") + return err + } - if helpers.IsStackComplete(status, true) { - cft := New(config, resourceCopy, "") - outputs, err := cft.GetOutputs() - if err != nil { - logger.WithError(err).Error("error getting outputs") - } - resourceCopy.Output.BucketName = outputs["BucketName"] - resourceCopy.Output.BucketARN = outputs["BucketArn"] - } + err = syncAdditionalResources(config, resourceCopy) + if err != nil { + logger.WithError(err).Info("error syncing resources") + } + return nil +} - _, err = clientSet.S3Buckets(namespace).Update(resourceCopy) - if err != nil { - logger.WithError(err).Error("error updating resource") - return err - } +func deleteStack(config *config.Config, name string, namespace string, stackID string) error { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.S3Buckets(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting s3buckets") + return err + } - err = syncAdditionalResources(config, resourceCopy) - if err != nil { - logger.WithError(err).Info("error syncing resources") - } - return nil + cft := New(config, resource, "") + err = cft.DeleteStack() + if err != nil { + return err + } + + err = cft.WaitUntilStackDeleted() + return err } func syncAdditionalResources(config *config.Config, s *awsV1alpha1.S3Bucket) (err error) { diff --git a/pkg/operator/snssubscription/cft.go b/pkg/operator/snssubscription/cft.go new file mode 100644 index 000000000..1843486e4 --- /dev/null +++ b/pkg/operator/snssubscription/cft.go @@ -0,0 +1,233 @@ +// >>>>>>> DO NOT EDIT THIS FILE <<<<<<<<<< +// This file is autogenerated via `aws-operator-codegen process` +// If you'd like the change anything about this file make edits to the .templ +// file in the pkg/codegen/assets directory. + +package snssubscription + +import ( + "errors" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/cloudformation" + awsV1alpha1 "github.com/christopherhein/aws-operator/pkg/apis/operator.aws/v1alpha1" + "github.com/christopherhein/aws-operator/pkg/config" + "github.com/christopherhein/aws-operator/pkg/helpers" +) + +// New generates a new object +func New(config *config.Config, snssubscription *awsV1alpha1.SNSSubscription, topicARN string) *Cloudformation { + return &Cloudformation{ + SNSSubscription: snssubscription, + config: config, + topicARN: topicARN, + } +} + +// Cloudformation defines the snssubscription cfts +type Cloudformation struct { + config *config.Config + SNSSubscription *awsV1alpha1.SNSSubscription + topicARN string +} + +// StackName returns the name of the stack based on the aws-operator-config +func (s *Cloudformation) StackName() string { + return helpers.StackName(s.config.ClusterName, "snssubscription", s.SNSSubscription.Name, s.SNSSubscription.Namespace) +} + +// GetOutputs return the stack outputs from the DescribeStacks call +func (s *Cloudformation) GetOutputs() (map[string]string, error) { + outputs := map[string]string{} + sess := s.config.AWSSession + svc := cloudformation.New(sess) + + stackInputs := cloudformation.DescribeStacksInput{ + StackName: aws.String(s.StackName()), + } + + output, err := svc.DescribeStacks(&stackInputs) + if err != nil { + return nil, err + } + // Not sure if this is even possible + if len(output.Stacks) != 1 { + return nil, errors.New("no stacks returned with that stack name") + } + + for _, out := range output.Stacks[0].Outputs { + outputs[*out.OutputKey] = *out.OutputValue + } + + return outputs, err +} + +// CreateStack will create the stack with the supplied params +func (s *Cloudformation) CreateStack() (output *cloudformation.CreateStackOutput, err error) { + sess := s.config.AWSSession + svc := cloudformation.New(sess) + + cftemplate := helpers.GetCloudFormationTemplate(s.config, "snssubscription", s.SNSSubscription.Spec.CloudFormationTemplateName, s.SNSSubscription.Spec.CloudFormationTemplateNamespace) + + stackInputs := cloudformation.CreateStackInput{ + StackName: aws.String(s.StackName()), + TemplateURL: aws.String(cftemplate), + NotificationARNs: []*string{ + aws.String(s.topicARN), + }, + } + + resourceName := helpers.CreateParam("ResourceName", s.SNSSubscription.Name) + resourceVersion := helpers.CreateParam("ResourceVersion", s.SNSSubscription.ResourceVersion) + namespace := helpers.CreateParam("Namespace", s.SNSSubscription.Namespace) + clusterName := helpers.CreateParam("ClusterName", s.config.ClusterName) + topicNameTemp := "{{(call .Helpers.GetSNSTopicByName .Config .Obj.Spec.TopicName .Obj.Namespace).Output.TopicARN}}" + topicNameValue, err := helpers.Templatize(topicNameTemp, helpers.Data{Obj: s.SNSSubscription, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + topicName := helpers.CreateParam("TopicARN", helpers.Stringify(topicNameValue)) + protocolTemp := "{{.Obj.Spec.Protocol}}" + protocolValue, err := helpers.Templatize(protocolTemp, helpers.Data{Obj: s.SNSSubscription, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + protocol := helpers.CreateParam("Protocol", helpers.Stringify(protocolValue)) + endpointTemp := "{{if (eq .Obj.Spec.Protocol \"sqs\")}}{{(call .Helpers.GetSQSQueueByName .Config .Obj.Spec.Endpoint .Obj.Namespace).Output.QueueARN }}{{else}}{{.Obj.Spec.Endpoint}}{{end}}" + endpointValue, err := helpers.Templatize(endpointTemp, helpers.Data{Obj: s.SNSSubscription, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + endpoint := helpers.CreateParam("Endpoint", helpers.Stringify(endpointValue)) + queueURLTemp := "{{(call .Helpers.GetSQSQueueByName .Config .Obj.Spec.Endpoint .Obj.Namespace).Output.QueueURL }}" + queueURLValue, err := helpers.Templatize(queueURLTemp, helpers.Data{Obj: s.SNSSubscription, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + queueURL := helpers.CreateParam("QueueURL", helpers.Stringify(queueURLValue)) + + parameters := []*cloudformation.Parameter{} + parameters = append(parameters, resourceName) + parameters = append(parameters, resourceVersion) + parameters = append(parameters, namespace) + parameters = append(parameters, clusterName) + parameters = append(parameters, topicName) + parameters = append(parameters, protocol) + parameters = append(parameters, endpoint) + parameters = append(parameters, queueURL) + + stackInputs.SetParameters(parameters) + + resourceNameTag := helpers.CreateTag("ResourceName", s.SNSSubscription.Name) + resourceVersionTag := helpers.CreateTag("ResourceVersion", s.SNSSubscription.ResourceVersion) + namespaceTag := helpers.CreateTag("Namespace", s.SNSSubscription.Namespace) + clusterNameTag := helpers.CreateTag("ClusterName", s.config.ClusterName) + + tags := []*cloudformation.Tag{} + tags = append(tags, resourceNameTag) + tags = append(tags, resourceVersionTag) + tags = append(tags, namespaceTag) + tags = append(tags, clusterNameTag) + + stackInputs.SetTags(tags) + + output, err = svc.CreateStack(&stackInputs) + return +} + +// UpdateStack will update the existing stack +func (s *Cloudformation) UpdateStack(updated *awsV1alpha1.SNSSubscription) (output *cloudformation.UpdateStackOutput, err error) { + sess := s.config.AWSSession + svc := cloudformation.New(sess) + + cftemplate := helpers.GetCloudFormationTemplate(s.config, "snssubscription", updated.Spec.CloudFormationTemplateName, updated.Spec.CloudFormationTemplateNamespace) + + stackInputs := cloudformation.UpdateStackInput{ + StackName: aws.String(s.StackName()), + TemplateURL: aws.String(cftemplate), + NotificationARNs: []*string{ + aws.String(s.topicARN), + }, + } + + resourceName := helpers.CreateParam("ResourceName", s.SNSSubscription.Name) + resourceVersion := helpers.CreateParam("ResourceVersion", s.SNSSubscription.ResourceVersion) + namespace := helpers.CreateParam("Namespace", s.SNSSubscription.Namespace) + clusterName := helpers.CreateParam("ClusterName", s.config.ClusterName) + topicNameTemp := "{{(call .Helpers.GetSNSTopicByName .Config .Obj.Spec.TopicName .Obj.Namespace).Output.TopicARN}}" + topicNameValue, err := helpers.Templatize(topicNameTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + topicName := helpers.CreateParam("TopicARN", helpers.Stringify(topicNameValue)) + protocolTemp := "{{.Obj.Spec.Protocol}}" + protocolValue, err := helpers.Templatize(protocolTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + protocol := helpers.CreateParam("Protocol", helpers.Stringify(protocolValue)) + endpointTemp := "{{if (eq .Obj.Spec.Protocol \"sqs\")}}{{(call .Helpers.GetSQSQueueByName .Config .Obj.Spec.Endpoint .Obj.Namespace).Output.QueueARN }}{{else}}{{.Obj.Spec.Endpoint}}{{end}}" + endpointValue, err := helpers.Templatize(endpointTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + endpoint := helpers.CreateParam("Endpoint", helpers.Stringify(endpointValue)) + queueURLTemp := "{{(call .Helpers.GetSQSQueueByName .Config .Obj.Spec.Endpoint .Obj.Namespace).Output.QueueURL }}" + queueURLValue, err := helpers.Templatize(queueURLTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + queueURL := helpers.CreateParam("QueueURL", helpers.Stringify(queueURLValue)) + + parameters := []*cloudformation.Parameter{} + parameters = append(parameters, resourceName) + parameters = append(parameters, resourceVersion) + parameters = append(parameters, namespace) + parameters = append(parameters, clusterName) + parameters = append(parameters, topicName) + parameters = append(parameters, protocol) + parameters = append(parameters, endpoint) + parameters = append(parameters, queueURL) + + stackInputs.SetParameters(parameters) + + resourceNameTag := helpers.CreateTag("ResourceName", s.SNSSubscription.Name) + resourceVersionTag := helpers.CreateTag("ResourceVersion", s.SNSSubscription.ResourceVersion) + namespaceTag := helpers.CreateTag("Namespace", s.SNSSubscription.Namespace) + clusterNameTag := helpers.CreateTag("ClusterName", s.config.ClusterName) + + tags := []*cloudformation.Tag{} + tags = append(tags, resourceNameTag) + tags = append(tags, resourceVersionTag) + tags = append(tags, namespaceTag) + tags = append(tags, clusterNameTag) + + stackInputs.SetTags(tags) + + output, err = svc.UpdateStack(&stackInputs) + return +} + +// DeleteStack will delete the stack +func (s *Cloudformation) DeleteStack() (err error) { + sess := s.config.AWSSession + svc := cloudformation.New(sess) + + stackInputs := cloudformation.DeleteStackInput{} + stackInputs.SetStackName(s.StackName()) + + _, err = svc.DeleteStack(&stackInputs) + return +} + +// WaitUntilStackDeleted will delete the stack +func (s *Cloudformation) WaitUntilStackDeleted() (err error) { + sess := s.config.AWSSession + svc := cloudformation.New(sess) + + stackInputs := cloudformation.DescribeStacksInput{ + StackName: aws.String(s.StackName()), + } + + err = svc.WaitUntilStackDeleteComplete(&stackInputs) + return +} diff --git a/pkg/operator/snssubscription/controller.go b/pkg/operator/snssubscription/controller.go new file mode 100644 index 000000000..0fd0d7b4a --- /dev/null +++ b/pkg/operator/snssubscription/controller.go @@ -0,0 +1,266 @@ +// >>>>>>> DO NOT EDIT THIS FILE <<<<<<<<<< +// This file is autogenerated via `aws-operator generate` +// If you'd like the change anything about this file make edits to the .templ +// file in the pkg/codegen/assets directory. + +package snssubscription + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/christopherhein/aws-operator/pkg/helpers" + "reflect" + + "github.com/christopherhein/aws-operator/pkg/config" + "github.com/christopherhein/aws-operator/pkg/queue" + opkit "github.com/christopherhein/operator-kit" + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/client-go/tools/cache" + + awsapi "github.com/christopherhein/aws-operator/pkg/apis/operator.aws" + awsV1alpha1 "github.com/christopherhein/aws-operator/pkg/apis/operator.aws/v1alpha1" + awsclient "github.com/christopherhein/aws-operator/pkg/client/clientset/versioned/typed/operator.aws/v1alpha1" +) + +// Resource is the object store definition +var Resource = opkit.CustomResource{ + Name: "snssubscription", + Plural: "snssubscriptions", + Group: awsapi.GroupName, + Version: awsapi.Version, + Scope: apiextensionsv1beta1.NamespaceScoped, + Kind: reflect.TypeOf(awsV1alpha1.SNSSubscription{}).Name(), + ShortNames: []string{ + "subscription", + }, +} + +// Controller represents a controller object for object store custom resources +type Controller struct { + config *config.Config + context *opkit.Context + awsclientset awsclient.OperatorV1alpha1Interface + topicARN string +} + +// NewController create controller for watching object store custom resources created +func NewController(config *config.Config, context *opkit.Context, awsclientset awsclient.OperatorV1alpha1Interface) *Controller { + return &Controller{ + config: config, + context: context, + awsclientset: awsclientset, + } +} + +// StartWatch watches for instances of Object Store custom resources and acts on them +func (c *Controller) StartWatch(namespace string, stopCh chan struct{}) error { + resourceHandlers := cache.ResourceEventHandlerFuncs{ + AddFunc: c.onAdd, + UpdateFunc: c.onUpdate, + DeleteFunc: c.onDelete, + } + queuectrl := queue.New(c.config, c.context, c.awsclientset, 1) + c.topicARN, _, _, _ = queuectrl.Register("snssubscription", &awsV1alpha1.SNSSubscription{}) + go queuectrl.StartWatch(queue.HandlerFunc(QueueUpdater), stopCh) + + restClient := c.awsclientset.RESTClient() + watcher := opkit.NewWatcher(Resource, namespace, resourceHandlers, restClient) + go watcher.Watch(&awsV1alpha1.SNSSubscription{}, stopCh) + + return nil +} +// QueueUpdater will take the messages from the queue and process them +func QueueUpdater(config *config.Config, msg *queue.MessageBody) error { + logger := config.Logger + var name, namespace string + if msg.Updatable { + name = msg.ResourceName + namespace = msg.Namespace + } else { + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resources, err := clientSet.SNSSubscriptions("").List(metav1.ListOptions{}) + if err != nil { + logger.WithError(err).Error("error getting snssubscriptions") + return err + } + for _, resource := range resources.Items { + if resource.Status.StackID == msg.ParsedMessage["StackId"] { + name = resource.Name + namespace = resource.Namespace + } + } + } + + if name != "" && namespace != "" { + if msg.ParsedMessage["ResourceStatus"] == "ROLLBACK_COMPLETE" { + err := deleteStack(config, name, namespace, msg.ParsedMessage["StackId"]) + if err != nil { + return err + } + } else if msg.ParsedMessage["ResourceStatus"] == "DELETE_COMPLETE" { + err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) + if err != nil { + return err + } + + err = incrementRollbackCount(config, name, namespace) + if err != nil { + return err + } + } else { + err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) + if err != nil { + return err + } + } + } + + return nil +} + +func (c *Controller) onAdd(obj interface{}) { + s := obj.(*awsV1alpha1.SNSSubscription).DeepCopy() + if s.Status.ResourceStatus == "" || s.Status.ResourceStatus == "DELETE_COMPLETE" { + cft := New(c.config, s, c.topicARN) + output, err := cft.CreateStack() + if err != nil { + c.config.Logger.WithError(err).Errorf("error creating snssubscription '%s'", s.Name) + return + } + c.config.Logger.Infof("added snssubscription '%s' with stackID '%s'", s.Name, string(*output.StackId)) + c.config.Logger.Infof("view at https://console.aws.amazon.com/cloudformation/home?#/stack/detail?stackId=%s", string(*output.StackId)) + + err = updateStatus(c.config, s.Name, s.Namespace, string(*output.StackId), "CREATE_IN_PROGRESS", "") + if err != nil { + c.config.Logger.WithError(err).Error("error updating status") + } + } +} + +func (c *Controller) onUpdate(oldObj, newObj interface{}) { + oo := oldObj.(*awsV1alpha1.SNSSubscription).DeepCopy() + no := newObj.(*awsV1alpha1.SNSSubscription).DeepCopy() + + if no.Status.ResourceStatus == "DELETE_COMPLETE" { + c.onAdd(no) + } + if helpers.IsStackComplete(oo.Status.ResourceStatus, false) && !reflect.DeepEqual(oo.Spec, no.Spec) { + cft := New(c.config, oo, c.topicARN) + output, err := cft.UpdateStack(no) + if err != nil { + c.config.Logger.WithError(err).Errorf("error updating snssubscription '%s' with new params %+v and old %+v", no.Name, no, oo) + return + } + c.config.Logger.Infof("updated snssubscription '%s' with params '%s'", no.Name, string(*output.StackId)) + c.config.Logger.Infof("view at https://console.aws.amazon.com/cloudformation/home?#/stack/detail?stackId=%s", string(*output.StackId)) + + err = updateStatus(c.config, oo.Name, oo.Namespace, string(*output.StackId), "UPDATE_IN_PROGRESS", "") + if err != nil { + c.config.Logger.WithError(err).Error("error updating status") + } + } +} + +func (c *Controller) onDelete(obj interface{}) { + s := obj.(*awsV1alpha1.SNSSubscription).DeepCopy() + cft := New(c.config, s, c.topicARN) + err := cft.DeleteStack() + if err != nil { + c.config.Logger.WithError(err).Errorf("error deleting snssubscription '%s'", s.Name) + return + } + + c.config.Logger.Infof("deleted snssubscription '%s'", s.Name) +} +func incrementRollbackCount(config *config.Config, name string, namespace string) error { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.SNSSubscriptions(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting snssubscriptions") + return err + } + + resourceCopy := resource.DeepCopy() + resourceCopy.Spec.RollbackCount = resourceCopy.Spec.RollbackCount+1 + + _, err = clientSet.SNSSubscriptions(namespace).Update(resourceCopy) + if err != nil { + logger.WithError(err).Error("error updating resource") + return err + } + return nil +} + +func updateStatus(config *config.Config, name string, namespace string, stackID string, status string, reason string) error { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.SNSSubscriptions(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting snssubscriptions") + return err + } + + resourceCopy := resource.DeepCopy() + resourceCopy.Status.ResourceStatus = status + resourceCopy.Status.ResourceStatusReason = reason + resourceCopy.Status.StackID = stackID + + if helpers.IsStackComplete(status, false) { + cft := New(config, resourceCopy, "") + outputs, err := cft.GetOutputs() + if err != nil { + logger.WithError(err).Error("error getting outputs") + } + resourceCopy.Output.SubscriptionARN = outputs["SubscriptionARN"] + } + + _, err = clientSet.SNSSubscriptions(namespace).Update(resourceCopy) + if err != nil { + logger.WithError(err).Error("error updating resource") + return err + } + + err = syncAdditionalResources(config, resourceCopy) + if err != nil { + logger.WithError(err).Info("error syncing resources") + } + return nil +} + +func deleteStack(config *config.Config, name string, namespace string, stackID string) error { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.SNSSubscriptions(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting snssubscriptions") + return err + } + + cft := New(config, resource, "") + err = cft.DeleteStack() + if err != nil { + return err + } + + err = cft.WaitUntilStackDeleted() + return err +} + +func syncAdditionalResources(config *config.Config, s *awsV1alpha1.SNSSubscription) (err error) { + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.SNSSubscriptions(s.Namespace).Get(s.Name, metav1.GetOptions{}) + if err != nil { + return err + } + resource = resource.DeepCopy() + + + + + + _, err = clientSet.SNSSubscriptions(s.Namespace).Update(resource) + if err != nil { + return err + } + return nil +} diff --git a/pkg/operator/snstopic/cft.go b/pkg/operator/snstopic/cft.go index a6fd93e03..809308c1b 100644 --- a/pkg/operator/snstopic/cft.go +++ b/pkg/operator/snstopic/cft.go @@ -30,6 +30,11 @@ type Cloudformation struct { topicARN string } +// StackName returns the name of the stack based on the aws-operator-config +func (s *Cloudformation) StackName() string { + return helpers.StackName(s.config.ClusterName, "snstopic", s.SNSTopic.Name, s.SNSTopic.Namespace) +} + // GetOutputs return the stack outputs from the DescribeStacks call func (s *Cloudformation) GetOutputs() (map[string]string, error) { outputs := map[string]string{} @@ -37,7 +42,7 @@ func (s *Cloudformation) GetOutputs() (map[string]string, error) { svc := cloudformation.New(sess) stackInputs := cloudformation.DescribeStacksInput{ - StackName: aws.String(s.SNSTopic.Name), + StackName: aws.String(s.StackName()), } output, err := svc.DescribeStacks(&stackInputs) @@ -64,7 +69,7 @@ func (s *Cloudformation) CreateStack() (output *cloudformation.CreateStackOutput cftemplate := helpers.GetCloudFormationTemplate(s.config, "snstopic", s.SNSTopic.Spec.CloudFormationTemplateName, s.SNSTopic.Spec.CloudFormationTemplateNamespace) stackInputs := cloudformation.CreateStackInput{ - StackName: aws.String(s.SNSTopic.Name), + StackName: aws.String(s.StackName()), TemplateURL: aws.String(cftemplate), NotificationARNs: []*string{ aws.String(s.topicARN), @@ -109,7 +114,7 @@ func (s *Cloudformation) UpdateStack(updated *awsV1alpha1.SNSTopic) (output *clo cftemplate := helpers.GetCloudFormationTemplate(s.config, "snstopic", updated.Spec.CloudFormationTemplateName, updated.Spec.CloudFormationTemplateNamespace) stackInputs := cloudformation.UpdateStackInput{ - StackName: aws.String(s.SNSTopic.Name), + StackName: aws.String(s.StackName()), TemplateURL: aws.String(cftemplate), NotificationARNs: []*string{ aws.String(s.topicARN), @@ -152,8 +157,21 @@ func (s *Cloudformation) DeleteStack() (err error) { svc := cloudformation.New(sess) stackInputs := cloudformation.DeleteStackInput{} - stackInputs.SetStackName(s.SNSTopic.Name) + stackInputs.SetStackName(s.StackName()) _, err = svc.DeleteStack(&stackInputs) return } + +// WaitUntilStackDeleted will delete the stack +func (s *Cloudformation) WaitUntilStackDeleted() (err error) { + sess := s.config.AWSSession + svc := cloudformation.New(sess) + + stackInputs := cloudformation.DescribeStacksInput{ + StackName: aws.String(s.StackName()), + } + + err = svc.WaitUntilStackDeleteComplete(&stackInputs) + return +} diff --git a/pkg/operator/snstopic/controller.go b/pkg/operator/snstopic/controller.go index 06f6242ee..c456710e5 100644 --- a/pkg/operator/snstopic/controller.go +++ b/pkg/operator/snstopic/controller.go @@ -93,9 +93,26 @@ func QueueUpdater(config *config.Config, msg *queue.MessageBody) error { } if name != "" && namespace != "" { - err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) - if err != nil { - return err + if msg.ParsedMessage["ResourceStatus"] == "ROLLBACK_COMPLETE" { + err := deleteStack(config, name, namespace, msg.ParsedMessage["StackId"]) + if err != nil { + return err + } + } else if msg.ParsedMessage["ResourceStatus"] == "DELETE_COMPLETE" { + err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) + if err != nil { + return err + } + + err = incrementRollbackCount(config, name, namespace) + if err != nil { + return err + } + } else { + err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) + if err != nil { + return err + } } } @@ -104,7 +121,7 @@ func QueueUpdater(config *config.Config, msg *queue.MessageBody) error { func (c *Controller) onAdd(obj interface{}) { s := obj.(*awsV1alpha1.SNSTopic).DeepCopy() - if s.Status.ResourceStatus == "" { + if s.Status.ResourceStatus == "" || s.Status.ResourceStatus == "DELETE_COMPLETE" { cft := New(c.config, s, c.topicARN) output, err := cft.CreateStack() if err != nil { @@ -124,7 +141,11 @@ func (c *Controller) onAdd(obj interface{}) { func (c *Controller) onUpdate(oldObj, newObj interface{}) { oo := oldObj.(*awsV1alpha1.SNSTopic).DeepCopy() no := newObj.(*awsV1alpha1.SNSTopic).DeepCopy() - if helpers.IsStackComplete(oo.Status.ResourceStatus, false) { + + if no.Status.ResourceStatus == "DELETE_COMPLETE" { + c.onAdd(no) + } + if helpers.IsStackComplete(oo.Status.ResourceStatus, false) && !reflect.DeepEqual(oo.Spec, no.Spec) { cft := New(c.config, oo, c.topicARN) output, err := cft.UpdateStack(no) if err != nil { @@ -152,40 +173,79 @@ func (c *Controller) onDelete(obj interface{}) { c.config.Logger.Infof("deleted snstopic '%s'", s.Name) } +func incrementRollbackCount(config *config.Config, name string, namespace string) error { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.SNSTopics(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting snstopics") + return err + } + + resourceCopy := resource.DeepCopy() + resourceCopy.Spec.RollbackCount = resourceCopy.Spec.RollbackCount+1 + + _, err = clientSet.SNSTopics(namespace).Update(resourceCopy) + if err != nil { + logger.WithError(err).Error("error updating resource") + return err + } + return nil +} + func updateStatus(config *config.Config, name string, namespace string, stackID string, status string, reason string) error { - logger := config.Logger - clientSet, _ := awsclient.NewForConfig(config.RESTConfig) - resource, err := clientSet.SNSTopics(namespace).Get(name, metav1.GetOptions{}) + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.SNSTopics(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting snstopics") + return err + } + + resourceCopy := resource.DeepCopy() + resourceCopy.Status.ResourceStatus = status + resourceCopy.Status.ResourceStatusReason = reason + resourceCopy.Status.StackID = stackID + + if helpers.IsStackComplete(status, false) { + cft := New(config, resourceCopy, "") + outputs, err := cft.GetOutputs() if err != nil { - logger.WithError(err).Error("error getting snstopics") - return err + logger.WithError(err).Error("error getting outputs") } + resourceCopy.Output.TopicARN = outputs["TopicARN"] + } - resourceCopy := resource.DeepCopy() - resourceCopy.Status.ResourceStatus = status - resourceCopy.Status.ResourceStatusReason = reason - resourceCopy.Status.StackID = stackID + _, err = clientSet.SNSTopics(namespace).Update(resourceCopy) + if err != nil { + logger.WithError(err).Error("error updating resource") + return err + } - if helpers.IsStackComplete(status, true) { - cft := New(config, resourceCopy, "") - outputs, err := cft.GetOutputs() - if err != nil { - logger.WithError(err).Error("error getting outputs") - } - resourceCopy.Output.TopicName = outputs["TopicName"] - } + err = syncAdditionalResources(config, resourceCopy) + if err != nil { + logger.WithError(err).Info("error syncing resources") + } + return nil +} - _, err = clientSet.SNSTopics(namespace).Update(resourceCopy) - if err != nil { - logger.WithError(err).Error("error updating resource") - return err - } +func deleteStack(config *config.Config, name string, namespace string, stackID string) error { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.SNSTopics(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting snstopics") + return err + } - err = syncAdditionalResources(config, resourceCopy) - if err != nil { - logger.WithError(err).Info("error syncing resources") - } - return nil + cft := New(config, resource, "") + err = cft.DeleteStack() + if err != nil { + return err + } + + err = cft.WaitUntilStackDeleted() + return err } func syncAdditionalResources(config *config.Config, s *awsV1alpha1.SNSTopic) (err error) { diff --git a/pkg/operator/sqsqueue/cft.go b/pkg/operator/sqsqueue/cft.go index e7374bea0..a81926358 100644 --- a/pkg/operator/sqsqueue/cft.go +++ b/pkg/operator/sqsqueue/cft.go @@ -30,6 +30,11 @@ type Cloudformation struct { topicARN string } +// StackName returns the name of the stack based on the aws-operator-config +func (s *Cloudformation) StackName() string { + return helpers.StackName(s.config.ClusterName, "sqsqueue", s.SQSQueue.Name, s.SQSQueue.Namespace) +} + // GetOutputs return the stack outputs from the DescribeStacks call func (s *Cloudformation) GetOutputs() (map[string]string, error) { outputs := map[string]string{} @@ -37,7 +42,7 @@ func (s *Cloudformation) GetOutputs() (map[string]string, error) { svc := cloudformation.New(sess) stackInputs := cloudformation.DescribeStacksInput{ - StackName: aws.String(s.SQSQueue.Name), + StackName: aws.String(s.StackName()), } output, err := svc.DescribeStacks(&stackInputs) @@ -64,7 +69,7 @@ func (s *Cloudformation) CreateStack() (output *cloudformation.CreateStackOutput cftemplate := helpers.GetCloudFormationTemplate(s.config, "sqsqueue", s.SQSQueue.Spec.CloudFormationTemplateName, s.SQSQueue.Spec.CloudFormationTemplateNamespace) stackInputs := cloudformation.CreateStackInput{ - StackName: aws.String(s.SQSQueue.Name), + StackName: aws.String(s.StackName()), TemplateURL: aws.String(cftemplate), NotificationARNs: []*string{ aws.String(s.topicARN), @@ -75,14 +80,54 @@ func (s *Cloudformation) CreateStack() (output *cloudformation.CreateStackOutput resourceVersion := helpers.CreateParam("ResourceVersion", s.SQSQueue.ResourceVersion) namespace := helpers.CreateParam("Namespace", s.SQSQueue.Namespace) clusterName := helpers.CreateParam("ClusterName", s.config.ClusterName) - contentBasedDeduplication := helpers.CreateParam("ContentBasedDeduplication", helpers.Stringify(s.SQSQueue.Spec.ContentBasedDeduplication)) - delaySeconds := helpers.CreateParam("DelaySeconds", helpers.Stringify(s.SQSQueue.Spec.DelaySeconds)) - maximumMessageSize := helpers.CreateParam("MaximumMessageSize", helpers.Stringify(s.SQSQueue.Spec.MaximumMessageSize)) - messageRetentionPeriod := helpers.CreateParam("MessageRetentionPeriod", helpers.Stringify(s.SQSQueue.Spec.MessageRetentionPeriod)) - receiveMessageWaitTimeSeconds := helpers.CreateParam("ReceiveMessageWaitTimeSeconds", helpers.Stringify(s.SQSQueue.Spec.ReceiveMessageWaitTimeSeconds)) - usedeadletterQueue := helpers.CreateParam("UsedeadletterQueue", helpers.Stringify(s.SQSQueue.Spec.UsedeadletterQueue)) - visibilityTimeout := helpers.CreateParam("VisibilityTimeout", helpers.Stringify(s.SQSQueue.Spec.VisibilityTimeout)) - fifoQueue := helpers.CreateParam("FifoQueue", helpers.Stringify(s.SQSQueue.Spec.FifoQueue)) + contentBasedDeduplicationTemp := "{{.Obj.Spec.ContentBasedDeduplication}}" + contentBasedDeduplicationValue, err := helpers.Templatize(contentBasedDeduplicationTemp, helpers.Data{Obj: s.SQSQueue, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + contentBasedDeduplication := helpers.CreateParam("ContentBasedDeduplication", helpers.Stringify(contentBasedDeduplicationValue)) + delaySecondsTemp := "{{.Obj.Spec.DelaySeconds}}" + delaySecondsValue, err := helpers.Templatize(delaySecondsTemp, helpers.Data{Obj: s.SQSQueue, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + delaySeconds := helpers.CreateParam("DelaySeconds", helpers.Stringify(delaySecondsValue)) + maximumMessageSizeTemp := "{{.Obj.Spec.MaximumMessageSize}}" + maximumMessageSizeValue, err := helpers.Templatize(maximumMessageSizeTemp, helpers.Data{Obj: s.SQSQueue, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + maximumMessageSize := helpers.CreateParam("MaximumMessageSize", helpers.Stringify(maximumMessageSizeValue)) + messageRetentionPeriodTemp := "{{.Obj.Spec.MessageRetentionPeriod}}" + messageRetentionPeriodValue, err := helpers.Templatize(messageRetentionPeriodTemp, helpers.Data{Obj: s.SQSQueue, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + messageRetentionPeriod := helpers.CreateParam("MessageRetentionPeriod", helpers.Stringify(messageRetentionPeriodValue)) + receiveMessageWaitTimeSecondsTemp := "{{.Obj.Spec.ReceiveMessageWaitTimeSeconds}}" + receiveMessageWaitTimeSecondsValue, err := helpers.Templatize(receiveMessageWaitTimeSecondsTemp, helpers.Data{Obj: s.SQSQueue, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + receiveMessageWaitTimeSeconds := helpers.CreateParam("ReceiveMessageWaitTimeSeconds", helpers.Stringify(receiveMessageWaitTimeSecondsValue)) + usedeadletterQueueTemp := "{{.Obj.Spec.UsedeadletterQueue}}" + usedeadletterQueueValue, err := helpers.Templatize(usedeadletterQueueTemp, helpers.Data{Obj: s.SQSQueue, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + usedeadletterQueue := helpers.CreateParam("UsedeadletterQueue", helpers.Stringify(usedeadletterQueueValue)) + visibilityTimeoutTemp := "{{.Obj.Spec.VisibilityTimeout}}" + visibilityTimeoutValue, err := helpers.Templatize(visibilityTimeoutTemp, helpers.Data{Obj: s.SQSQueue, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + visibilityTimeout := helpers.CreateParam("VisibilityTimeout", helpers.Stringify(visibilityTimeoutValue)) + fifoQueueTemp := "{{.Obj.Spec.FifoQueue}}" + fifoQueueValue, err := helpers.Templatize(fifoQueueTemp, helpers.Data{Obj: s.SQSQueue, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + fifoQueue := helpers.CreateParam("FifoQueue", helpers.Stringify(fifoQueueValue)) parameters := []*cloudformation.Parameter{} parameters = append(parameters, resourceName) @@ -125,7 +170,7 @@ func (s *Cloudformation) UpdateStack(updated *awsV1alpha1.SQSQueue) (output *clo cftemplate := helpers.GetCloudFormationTemplate(s.config, "sqsqueue", updated.Spec.CloudFormationTemplateName, updated.Spec.CloudFormationTemplateNamespace) stackInputs := cloudformation.UpdateStackInput{ - StackName: aws.String(s.SQSQueue.Name), + StackName: aws.String(s.StackName()), TemplateURL: aws.String(cftemplate), NotificationARNs: []*string{ aws.String(s.topicARN), @@ -136,14 +181,54 @@ func (s *Cloudformation) UpdateStack(updated *awsV1alpha1.SQSQueue) (output *clo resourceVersion := helpers.CreateParam("ResourceVersion", s.SQSQueue.ResourceVersion) namespace := helpers.CreateParam("Namespace", s.SQSQueue.Namespace) clusterName := helpers.CreateParam("ClusterName", s.config.ClusterName) - contentBasedDeduplication := helpers.CreateParam("ContentBasedDeduplication", helpers.Stringify(updated.Spec.ContentBasedDeduplication)) - delaySeconds := helpers.CreateParam("DelaySeconds", helpers.Stringify(updated.Spec.DelaySeconds)) - maximumMessageSize := helpers.CreateParam("MaximumMessageSize", helpers.Stringify(updated.Spec.MaximumMessageSize)) - messageRetentionPeriod := helpers.CreateParam("MessageRetentionPeriod", helpers.Stringify(updated.Spec.MessageRetentionPeriod)) - receiveMessageWaitTimeSeconds := helpers.CreateParam("ReceiveMessageWaitTimeSeconds", helpers.Stringify(updated.Spec.ReceiveMessageWaitTimeSeconds)) - usedeadletterQueue := helpers.CreateParam("UsedeadletterQueue", helpers.Stringify(updated.Spec.UsedeadletterQueue)) - visibilityTimeout := helpers.CreateParam("VisibilityTimeout", helpers.Stringify(updated.Spec.VisibilityTimeout)) - fifoQueue := helpers.CreateParam("FifoQueue", helpers.Stringify(updated.Spec.FifoQueue)) + contentBasedDeduplicationTemp := "{{.Obj.Spec.ContentBasedDeduplication}}" + contentBasedDeduplicationValue, err := helpers.Templatize(contentBasedDeduplicationTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + contentBasedDeduplication := helpers.CreateParam("ContentBasedDeduplication", helpers.Stringify(contentBasedDeduplicationValue)) + delaySecondsTemp := "{{.Obj.Spec.DelaySeconds}}" + delaySecondsValue, err := helpers.Templatize(delaySecondsTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + delaySeconds := helpers.CreateParam("DelaySeconds", helpers.Stringify(delaySecondsValue)) + maximumMessageSizeTemp := "{{.Obj.Spec.MaximumMessageSize}}" + maximumMessageSizeValue, err := helpers.Templatize(maximumMessageSizeTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + maximumMessageSize := helpers.CreateParam("MaximumMessageSize", helpers.Stringify(maximumMessageSizeValue)) + messageRetentionPeriodTemp := "{{.Obj.Spec.MessageRetentionPeriod}}" + messageRetentionPeriodValue, err := helpers.Templatize(messageRetentionPeriodTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + messageRetentionPeriod := helpers.CreateParam("MessageRetentionPeriod", helpers.Stringify(messageRetentionPeriodValue)) + receiveMessageWaitTimeSecondsTemp := "{{.Obj.Spec.ReceiveMessageWaitTimeSeconds}}" + receiveMessageWaitTimeSecondsValue, err := helpers.Templatize(receiveMessageWaitTimeSecondsTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + receiveMessageWaitTimeSeconds := helpers.CreateParam("ReceiveMessageWaitTimeSeconds", helpers.Stringify(receiveMessageWaitTimeSecondsValue)) + usedeadletterQueueTemp := "{{.Obj.Spec.UsedeadletterQueue}}" + usedeadletterQueueValue, err := helpers.Templatize(usedeadletterQueueTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + usedeadletterQueue := helpers.CreateParam("UsedeadletterQueue", helpers.Stringify(usedeadletterQueueValue)) + visibilityTimeoutTemp := "{{.Obj.Spec.VisibilityTimeout}}" + visibilityTimeoutValue, err := helpers.Templatize(visibilityTimeoutTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + visibilityTimeout := helpers.CreateParam("VisibilityTimeout", helpers.Stringify(visibilityTimeoutValue)) + fifoQueueTemp := "{{.Obj.Spec.FifoQueue}}" + fifoQueueValue, err := helpers.Templatize(fifoQueueTemp, helpers.Data{Obj: updated, Config: s.config, Helpers: helpers.New()}) + if err != nil { + return output, err + } + fifoQueue := helpers.CreateParam("FifoQueue", helpers.Stringify(fifoQueueValue)) parameters := []*cloudformation.Parameter{} parameters = append(parameters, resourceName) @@ -184,8 +269,21 @@ func (s *Cloudformation) DeleteStack() (err error) { svc := cloudformation.New(sess) stackInputs := cloudformation.DeleteStackInput{} - stackInputs.SetStackName(s.SQSQueue.Name) + stackInputs.SetStackName(s.StackName()) _, err = svc.DeleteStack(&stackInputs) return } + +// WaitUntilStackDeleted will delete the stack +func (s *Cloudformation) WaitUntilStackDeleted() (err error) { + sess := s.config.AWSSession + svc := cloudformation.New(sess) + + stackInputs := cloudformation.DescribeStacksInput{ + StackName: aws.String(s.StackName()), + } + + err = svc.WaitUntilStackDeleteComplete(&stackInputs) + return +} diff --git a/pkg/operator/sqsqueue/controller.go b/pkg/operator/sqsqueue/controller.go index cafd9850b..cf0ed9eff 100644 --- a/pkg/operator/sqsqueue/controller.go +++ b/pkg/operator/sqsqueue/controller.go @@ -93,9 +93,26 @@ func QueueUpdater(config *config.Config, msg *queue.MessageBody) error { } if name != "" && namespace != "" { - err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) - if err != nil { - return err + if msg.ParsedMessage["ResourceStatus"] == "ROLLBACK_COMPLETE" { + err := deleteStack(config, name, namespace, msg.ParsedMessage["StackId"]) + if err != nil { + return err + } + } else if msg.ParsedMessage["ResourceStatus"] == "DELETE_COMPLETE" { + err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) + if err != nil { + return err + } + + err = incrementRollbackCount(config, name, namespace) + if err != nil { + return err + } + } else { + err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"]) + if err != nil { + return err + } } } @@ -104,7 +121,7 @@ func QueueUpdater(config *config.Config, msg *queue.MessageBody) error { func (c *Controller) onAdd(obj interface{}) { s := obj.(*awsV1alpha1.SQSQueue).DeepCopy() - if s.Status.ResourceStatus == "" { + if s.Status.ResourceStatus == "" || s.Status.ResourceStatus == "DELETE_COMPLETE" { cft := New(c.config, s, c.topicARN) output, err := cft.CreateStack() if err != nil { @@ -124,7 +141,11 @@ func (c *Controller) onAdd(obj interface{}) { func (c *Controller) onUpdate(oldObj, newObj interface{}) { oo := oldObj.(*awsV1alpha1.SQSQueue).DeepCopy() no := newObj.(*awsV1alpha1.SQSQueue).DeepCopy() - if helpers.IsStackComplete(oo.Status.ResourceStatus, false) { + + if no.Status.ResourceStatus == "DELETE_COMPLETE" { + c.onAdd(no) + } + if helpers.IsStackComplete(oo.Status.ResourceStatus, false) && !reflect.DeepEqual(oo.Spec, no.Spec) { cft := New(c.config, oo, c.topicARN) output, err := cft.UpdateStack(no) if err != nil { @@ -152,45 +173,84 @@ func (c *Controller) onDelete(obj interface{}) { c.config.Logger.Infof("deleted sqsqueue '%s'", s.Name) } +func incrementRollbackCount(config *config.Config, name string, namespace string) error { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.SQSQueues(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting sqsqueues") + return err + } + + resourceCopy := resource.DeepCopy() + resourceCopy.Spec.RollbackCount = resourceCopy.Spec.RollbackCount+1 + + _, err = clientSet.SQSQueues(namespace).Update(resourceCopy) + if err != nil { + logger.WithError(err).Error("error updating resource") + return err + } + return nil +} + func updateStatus(config *config.Config, name string, namespace string, stackID string, status string, reason string) error { - logger := config.Logger - clientSet, _ := awsclient.NewForConfig(config.RESTConfig) - resource, err := clientSet.SQSQueues(namespace).Get(name, metav1.GetOptions{}) + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.SQSQueues(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting sqsqueues") + return err + } + + resourceCopy := resource.DeepCopy() + resourceCopy.Status.ResourceStatus = status + resourceCopy.Status.ResourceStatusReason = reason + resourceCopy.Status.StackID = stackID + + if helpers.IsStackComplete(status, false) { + cft := New(config, resourceCopy, "") + outputs, err := cft.GetOutputs() if err != nil { - logger.WithError(err).Error("error getting sqsqueues") - return err + logger.WithError(err).Error("error getting outputs") } + resourceCopy.Output.QueueURL = outputs["QueueURL"] + resourceCopy.Output.QueueARN = outputs["QueueARN"] + resourceCopy.Output.QueueName = outputs["QueueName"] + resourceCopy.Output.DeadLetterQueueURL = outputs["DeadLetterQueueURL"] + resourceCopy.Output.DeadLetterQueueARN = outputs["DeadLetterQueueARN"] + resourceCopy.Output.DeadLetterQueueName = outputs["DeadLetterQueueName"] + } - resourceCopy := resource.DeepCopy() - resourceCopy.Status.ResourceStatus = status - resourceCopy.Status.ResourceStatusReason = reason - resourceCopy.Status.StackID = stackID + _, err = clientSet.SQSQueues(namespace).Update(resourceCopy) + if err != nil { + logger.WithError(err).Error("error updating resource") + return err + } - if helpers.IsStackComplete(status, true) { - cft := New(config, resourceCopy, "") - outputs, err := cft.GetOutputs() - if err != nil { - logger.WithError(err).Error("error getting outputs") - } - resourceCopy.Output.QueueURL = outputs["QueueURL"] - resourceCopy.Output.QueueARN = outputs["QueueARN"] - resourceCopy.Output.QueueName = outputs["QueueName"] - resourceCopy.Output.DeadLetterQueueURL = outputs["DeadLetterQueueURL"] - resourceCopy.Output.DeadLetterQueueARN = outputs["DeadLetterQueueARN"] - resourceCopy.Output.DeadLetterQueueName = outputs["DeadLetterQueueName"] - } + err = syncAdditionalResources(config, resourceCopy) + if err != nil { + logger.WithError(err).Info("error syncing resources") + } + return nil +} - _, err = clientSet.SQSQueues(namespace).Update(resourceCopy) - if err != nil { - logger.WithError(err).Error("error updating resource") - return err - } +func deleteStack(config *config.Config, name string, namespace string, stackID string) error { + logger := config.Logger + clientSet, _ := awsclient.NewForConfig(config.RESTConfig) + resource, err := clientSet.SQSQueues(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Error("error getting sqsqueues") + return err + } - err = syncAdditionalResources(config, resourceCopy) - if err != nil { - logger.WithError(err).Info("error syncing resources") - } - return nil + cft := New(config, resource, "") + err = cft.DeleteStack() + if err != nil { + return err + } + + err = cft.WaitUntilStackDeleted() + return err } func syncAdditionalResources(config *config.Config, s *awsV1alpha1.SQSQueue) (err error) {