Skip to content

Commit

Permalink
feat: Add an existing SQS queue option to SNS event (#1231)
Browse files Browse the repository at this point in the history
  • Loading branch information
53ningen authored and praneetap committed Dec 13, 2019
1 parent fad1edb commit 30e33b4
Show file tree
Hide file tree
Showing 12 changed files with 942 additions and 17 deletions.
29 changes: 26 additions & 3 deletions docs/internals/generated_resources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,12 @@ Example:
Type: SNS
Properties:
Topic: arn:aws:sns:us-east-1:123456789012:my_topic
SqsSubscription: true
SqsSubscription:
QueuePolicyLogicalId: CustomQueuePolicyLogicalId
QueueArn: !GetAtt MyCustomQueue.Arn
QueueUrl: !Ref MyCustomQueue
BatchSize: 5
Enabled: true
...
Additional generated resources:
Expand All @@ -275,6 +280,24 @@ AWS::SQS::QueuePolicy MyFunction\ **MyTrigger**\ QueuePolicy

NOTE: ``AWS::Lambda::Permission`` resources are only generated if SqsSubscription is ``false``. ``AWS::Lambda::EventSourceMapping``, ``AWS::SQS::Queue``, ``AWS::SQS::QueuePolicy`` resources are only generated if SqsSubscription is ``true``.

``AWS::SQS::Queue`` resources are only generated if SqsSubscription is ``true``.

Example:

.. code:: yaml
MyFunction:
Type: AWS::Serverless::Function
Properties:
...
Events:
MyTrigger:
Type: SNS
Properties:
Topic: arn:aws:sns:us-east-1:123456789012:my_topic
SqsSubscription: true
...
Kinesis
^^^^^^^

Expand Down Expand Up @@ -481,9 +504,9 @@ AWS::ApiGateway::Stage MyApi\ **dev**\ Stage
AWS::ApiGateway::Deployment MyApi\ Deployment\ *SHA* (10 Digits of SHA256 of DefinitionUri or DefinitionBody value)
================================== ================================

NOTE: By just specifying AWS::Serverless::Api resource, SAM will *not* add permission for API Gateway to invoke the
NOTE: By just specifying AWS::Serverless::Api resource, SAM will *not* add permission for API Gateway to invoke the
the Lambda Function backing the APIs. You should explicitly re-define all APIs under ``Events`` section of the
AWS::Serverless::Function resource but include a `RestApiId` property that references the AWS::Serverless::Api
AWS::Serverless::Function resource but include a `RestApiId` property that references the AWS::Serverless::Api
resource. SAM will add permission for these APIs to invoke the function.

Example:
Expand Down
57 changes: 43 additions & 14 deletions samtranslator/model/eventsources/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ class SNS(PushEventSource):
'Topic': PropertyType(True, is_str()),
'Region': PropertyType(False, is_str()),
'FilterPolicy': PropertyType(False, dict_of(is_str(), list_of(one_of(is_str(), is_type(dict))))),
'SqsSubscription': PropertyType(False, is_type(bool))
'SqsSubscription': PropertyType(False, one_of(is_type(bool), is_type(dict)))
}

def to_cloudformation(self, **kwargs):
Expand All @@ -398,17 +398,46 @@ def to_cloudformation(self, **kwargs):
)
return [self._construct_permission(function, source_arn=self.Topic), subscription]

# SNS -> SQS -> Lambda
# SNS -> SQS(Create New) -> Lambda
if isinstance(self.SqsSubscription, bool):
resources = []
queue = self._inject_sqs_queue()
queue_arn = queue.get_runtime_attr('arn')
queue_url = queue.get_runtime_attr('queue_url')

queue_policy = self._inject_sqs_queue_policy(self.Topic, queue_arn, queue_url)
subscription = self._inject_subscription(
'sqs', queue_arn,
self.Topic, self.Region, self.FilterPolicy, function.resource_attributes
)
event_source = self._inject_sqs_event_source_mapping(function, role, queue_arn)

resources = resources + event_source
resources.append(queue)
resources.append(queue_policy)
resources.append(subscription)
return resources

# SNS -> SQS(Existing) -> Lambda
resources = []
queue = self._inject_sqs_queue()
queue_policy = self._inject_sqs_queue_policy(self.Topic, queue)
queue_arn = self.SqsSubscription.get('QueueArn', None)
queue_url = self.SqsSubscription.get('QueueUrl', None)
if not queue_arn or not queue_url:
raise InvalidEventException(
self.relative_id, "No QueueARN or QueueURL provided.")

queue_policy_logical_id = self.SqsSubscription.get('QueuePolicyLogicalId', None)
batch_size = self.SqsSubscription.get('BatchSize', None)
enabled = self.SqsSubscription.get('Enabled', None)

queue_policy = self._inject_sqs_queue_policy(self.Topic, queue_arn, queue_url, queue_policy_logical_id)
subscription = self._inject_subscription(
'sqs', queue.get_runtime_attr('arn'),
'sqs', queue_arn,
self.Topic, self.Region, self.FilterPolicy, function.resource_attributes
)
event_source = self._inject_sqs_event_source_mapping(function, role, queue_arn, batch_size, enabled)

resources = resources + self._inject_sqs_event_source_mapping(function, role, queue.get_runtime_attr('arn'))
resources.append(queue)
resources = resources + event_source
resources.append(queue_policy)
resources.append(subscription)
return resources
Expand All @@ -431,19 +460,19 @@ def _inject_subscription(self, protocol, endpoint, topic, region, filterPolicy,
def _inject_sqs_queue(self):
return SQSQueue(self.logical_id + 'Queue')

def _inject_sqs_event_source_mapping(self, function, role, queue_arn):
def _inject_sqs_event_source_mapping(self, function, role, queue_arn, batch_size=None, enabled=None):
event_source = SQS(self.logical_id + 'EventSourceMapping')
event_source.Queue = queue_arn
event_source.BatchSize = 10
event_source.Enabled = True
event_source.BatchSize = batch_size or 10
event_source.Enabled = enabled or True
return event_source.to_cloudformation(function=function, role=role)

def _inject_sqs_queue_policy(self, topic_arn, queue):
policy = SQSQueuePolicy(self.logical_id + 'QueuePolicy')
def _inject_sqs_queue_policy(self, topic_arn, queue_arn, queue_url, logical_id=None):
policy = SQSQueuePolicy(logical_id or self.logical_id + 'QueuePolicy')
policy.PolicyDocument = SQSQueuePolicies.sns_topic_send_message_role_policy(
topic_arn, queue.get_runtime_attr('arn')
topic_arn, queue_arn
)
policy.Queues = [queue.get_runtime_attr('queue_url')]
policy.Queues = [queue_url]
return policy


Expand Down
26 changes: 26 additions & 0 deletions tests/model/eventsources/test_sns_event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,29 @@ def test_to_cloudformation_passes_the_filter_policy(self):

def test_to_cloudformation_throws_when_no_function(self):
self.assertRaises(TypeError, self.sns_event_source.to_cloudformation)

def test_to_cloudformation_throws_when_queue_url_or_queue_arn_not_given(self):
sqsSubscription = {
'BatchSize': 5
}
self.sns_event_source.SqsSubscription = sqsSubscription
self.assertRaises(TypeError, self.sns_event_source.to_cloudformation)

def test_to_cloudformation_when_sqs_subscription_disable(self):
sqsSubscription = False
self.sns_event_source.SqsSubscription = sqsSubscription

resources = self.sns_event_source.to_cloudformation(
function=self.function)
self.assertEqual(len(resources), 2)
self.assertEqual(resources[0].resource_type,
'AWS::Lambda::Permission')
self.assertEqual(resources[1].resource_type,
'AWS::SNS::Subscription')

subscription = resources[1]
self.assertEqual(subscription.TopicArn, 'arn:aws:sns:MyTopic')
self.assertEqual(subscription.Protocol, 'lambda')
self.assertEqual(subscription.Endpoint, 'arn:aws:lambda:mock')
self.assertIsNone(subscription.Region)
self.assertIsNone(subscription.FilterPolicy)
31 changes: 31 additions & 0 deletions tests/translator/input/sns_existing_sqs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
Resources:
SaveNotificationFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: s3://sam-demo-bucket/notifications.zip
Handler: index.save_notification
Runtime: nodejs8.10
Events:
NotificationTopic:
Type: SNS
Properties:
Topic: !Ref Notifications
SqsSubscription:
QueueUrl: !Ref Queue
QueueArn: !GetAtt Queue.Arn
QueuePolicyLogicalId: NotificationA
BatchSize: 8
Enabled: true
FilterPolicy:
store:
- example_corp
price_usd:
- numeric:
- ">="
- 100

Notifications:
Type: AWS::SNS::Topic

Queue:
Type: AWS::SQS::Queue
28 changes: 28 additions & 0 deletions tests/translator/input/sns_outside_sqs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Resources:
SaveNotificationFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: s3://sam-demo-bucket/notifications.zip
Handler: index.save_notification
Runtime: nodejs8.10
Events:
NotificationTopic:
Type: SNS
Properties:
Topic: !Ref Notifications
SqsSubscription:
QueueUrl: https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue
QueueArn: arn:aws:sqs:us-east-1:123456789012:MyQueue
QueuePolicyLogicalId: NotificationB
BatchSize: 8
Enabled: true
FilterPolicy:
store:
- example_corp
price_usd:
- numeric:
- ">="
- 100

Notifications:
Type: AWS::SNS::Topic
141 changes: 141 additions & 0 deletions tests/translator/output/aws-cn/sns_existing_sqs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
{
"Resources": {
"Queue": {
"Type": "AWS::SQS::Queue"
},
"Notifications": {
"Type": "AWS::SNS::Topic"
},
"NotificationA": {
"Type": "AWS::SQS::QueuePolicy",
"Properties": {
"Queues": [
{
"Ref": "Queue"
}
],
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "sqs:SendMessage",
"Resource": {
"Fn::GetAtt": [
"Queue",
"Arn"
]
},
"Effect": "Allow",
"Condition": {
"ArnEquals": {
"aws:SourceArn": {
"Ref": "Notifications"
}
}
},
"Principal": "*"
}
]
}
}
},
"SaveNotificationFunctionNotificationTopic": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"FilterPolicy": {
"price_usd": [
{
"numeric": [
">=",
100
]
}
],
"store": [
"example_corp"
]
},
"Endpoint": {
"Fn::GetAtt": [
"Queue",
"Arn"
]
},
"Protocol": "sqs",
"TopicArn": {
"Ref": "Notifications"
}
}
},
"SaveNotificationFunctionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
]
},
"ManagedPolicyArns": [
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaSQSQueueExecutionRole"
],
"Tags": [
{
"Value": "SAM",
"Key": "lambda:createdBy"
}
]
}
},
"SaveNotificationFunctionNotificationTopicEventSourceMapping": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"BatchSize": 8,
"Enabled": true,
"FunctionName": {
"Ref": "SaveNotificationFunction"
},
"EventSourceArn": {
"Fn::GetAtt": [
"Queue",
"Arn"
]
}
}
},
"SaveNotificationFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Handler": "index.save_notification",
"Code": {
"S3Bucket": "sam-demo-bucket",
"S3Key": "notifications.zip"
},
"Role": {
"Fn::GetAtt": [
"SaveNotificationFunctionRole",
"Arn"
]
},
"Runtime": "nodejs8.10",
"Tags": [
{
"Value": "SAM",
"Key": "lambda:createdBy"
}
]
}
}
}
}
Loading

0 comments on commit 30e33b4

Please sign in to comment.