Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add SQS option to SNS event #1065

Merged
merged 6 commits into from
Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions docs/internals/generated_resources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ CloudFormation Resource Type Logical ID
AWS::ApiGateway::RestApi *ServerlessRestApi*
AWS::ApiGateway::Stage *ServerlessRestApi*\ **Prod**\ Stage
AWS::ApiGateway::Deployment *ServerlessRestApi*\ Deployment\ *SHA* (10 Digits of SHA256 of Swagger)
AWS::Lambda::Permissions MyFunction\ **ThumbnailApi**\ Permission\ **Prod**
AWS::Lambda::Permission MyFunction\ **ThumbnailApi**\ Permission\ **Prod**
(Prod is the default Stage Name for implicit APIs)
================================== ================================

Expand Down Expand Up @@ -155,7 +155,7 @@ Additional generated resources:
================================== ================================
CloudFormation Resource Type Logical ID
================================== ================================
AWS::Lambda::Permissions MyFunction\ **S3Trigger**\ Permission
AWS::Lambda::Permission MyFunction\ **S3Trigger**\ Permission
AWS::S3::Bucket Existing MyBucket resource is modified to append ``NotificationConfiguration``
property where the Lambda function trigger is defined
================================== ================================
Expand Down Expand Up @@ -184,17 +184,23 @@ Example:
Type: SNS
Properties:
Topic: arn:aws:sns:us-east-1:123456789012:my_topic
SqsSubscription: true
...

Additional generated resources:

================================== ================================
CloudFormation Resource Type Logical ID
================================== ================================
AWS::Lambda::Permissions MyFunction\ **MyTrigger**\ Permission
AWS::SNS::Subscription MyFunction\ **MyTrigger**
AWS::Lambda::Permission MyFunction\ **MyTrigger**\ Permission
AWS::Lambda::EventSourceMapping MyFunction\ **MyTrigger**\ EventSourceMapping
AWS::SNS::Subscription MyFunction\ **MyTrigger**
AWS::SQS::Queue MyFunction\ **MyTrigger**\ Queue
AWS::SQS::QueuePolicy MyFunction\ **MyTrigger**\ QueuePolicy
================================== ================================

NOTE: ``AWS::Lambda::Permission`` resources are only generated if SqsSubscription is ``false``. ``AWS::Lambda::EventSourceMapping``, ``AWS::SQS::Queue``, ``AWS::SQS::QueuePolicy`` resources are only generated if SqsSubscription is ``true``.

Kinesis
^^^^^^^

Expand All @@ -219,7 +225,7 @@ Additional generated resources:
================================== ================================
CloudFormation Resource Type Logical ID
================================== ================================
AWS::Lambda::Permissions MyFunction\ **MyTrigger**\ Permission
AWS::Lambda::Permission MyFunction\ **MyTrigger**\ Permission
AWS::Lambda::EventSourceMapping MyFunction\ **MyTrigger**
================================== ================================

Expand All @@ -246,7 +252,7 @@ Additional generated resources:
================================== ================================
CloudFormation Resource Type Logical ID
================================== ================================
AWS::Lambda::Permissions MyFunction\ **MyTrigger**\ Permission
AWS::Lambda::Permission MyFunction\ **MyTrigger**\ Permission
AWS::Lambda::EventSourceMapping MyFunction\ **MyTrigger**
================================== ================================

Expand Down Expand Up @@ -274,7 +280,7 @@ Additional generated resources:
================================== ================================
CloudFormation Resource Type Logical ID
================================== ================================
AWS::Lambda::Permissions MyFunction\ **MyTrigger**\ Permission
AWS::Lambda::Permission MyFunction\ **MyTrigger**\ Permission
AWS::Lambda::EventSourceMapping MyFunction\ **MyTrigger**
================================== ================================

Expand All @@ -301,7 +307,7 @@ Additional generated resources:
================================== ================================
CloudFormation Resource Type Logical ID
================================== ================================
AWS::Lambda::Permissions MyFunction\ **MyTimer**\ Permission
AWS::Lambda::Permission MyFunction\ **MyTimer**\ Permission
AWS::Events::Rule MyFunction\ **MyTimer**
================================== ================================

Expand Down Expand Up @@ -331,7 +337,7 @@ Additional generated resources:
================================== ================================
CloudFormation Resource Type Logical ID
================================== ================================
AWS::Lambda::Permissions MyFunction\ **OnTerminate**\ Permission
AWS::Lambda::Permission MyFunction\ **OnTerminate**\ Permission
AWS::Events::Rule MyFunction\ **OnTerminate**
================================== ================================

Expand Down
1 change: 1 addition & 0 deletions examples/2016-10-31/sns_sqs/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
transformed-cfn-template.yaml
19 changes: 19 additions & 0 deletions examples/2016-10-31/sns_sqs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# SNS-SQS Event Source Example

Example SAM template for processing messages on an SNS-SQS.

## Running the example

```bash
# Set YOUR_S3_ARTIFACTS_BUCKET to a bucket you own
YOUR_S3_ARTIFACTS_BUCKET='YOUR_S3_ARTIFACTS_BUCKET'; \
aws cloudformation package --template-file template.yaml --output-template-file cfn-transformed-template.yaml --s3-bucket $YOUR_S3_ARTIFACTS_BUCKET
aws cloudformation deploy --template-file ./cfn-transformed-template.yaml --stack-name lambda-sns-sqs-processor --capabilities CAPABILITY_IAM
```

After your CloudFormation Stack has completed creation, send a message to the SNS topic to see it in action:

```bash
YOUR_SNS_TOPIC_ARN=arn:aws:sns:us-east-1:[your_account_id]:[your_topic_name]; \
aws sqs send-message --target-arn $YOUR_SNS_TOPIC_ARN --message '{ "myMessage": "Hello SAM!" }'
```
7 changes: 7 additions & 0 deletions examples/2016-10-31/sns_sqs/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
async function handler (event, context) {
const records = event.Records
console.log(records)
return {}
}

module.exports.handler = handler
23 changes: 23 additions & 0 deletions examples/2016-10-31/sns_sqs/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Example of processing messages on an SNS-SQS with Lambda
Resources:
MyTopic:
Type: AWS::SNS::Topic
MyFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./index.js
Handler: index.handler
Runtime: nodejs8.10
Events:
MySQSEvent:
Type: SNS
Properties:
Topic: !Ref MyTopic
SqsSubscription: true

Outputs:
MyTopic:
Description: "MyTopic ARN"
Value: !Ref MyTopic
58 changes: 50 additions & 8 deletions samtranslator/model/eventsources/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from samtranslator.model.sns import SNSSubscription
from samtranslator.model.lambda_ import LambdaPermission
from samtranslator.model.events import EventsRule
from samtranslator.model.eventsources.pull import SQS
from samtranslator.model.sqs import SQSQueue, SQSQueuePolicy, SQSQueuePolicies
from samtranslator.model.iot import IotTopicRule
from samtranslator.translator.arn_generator import ArnGenerator
from samtranslator.model.exceptions import InvalidEventException, InvalidResourceException
Expand Down Expand Up @@ -352,7 +354,8 @@ class SNS(PushEventSource):
property_types = {
'Topic': PropertyType(True, is_str()),
'Region': PropertyType(False, is_str()),
'FilterPolicy': PropertyType(False, dict_of(is_str(), list_of(one_of(is_str(), is_type(dict)))))
'FilterPolicy': PropertyType(False, dict_of(is_str(), list_of(one_of(is_str(), is_type(dict))))),
'SqsSubscription': PropertyType(False, is_type(bool))
}

def to_cloudformation(self, **kwargs):
Expand All @@ -363,28 +366,67 @@ def to_cloudformation(self, **kwargs):
:rtype: list
"""
function = kwargs.get('function')
role = kwargs.get('role')

if not function:
raise TypeError("Missing required keyword argument: function")

return [self._construct_permission(function, source_arn=self.Topic),
self._inject_subscription(function, self.Topic, self.Region, self.FilterPolicy)]
# SNS -> Lambda
if not self.SqsSubscription:
subscription = self._inject_subscription(
'lambda', function.get_runtime_attr("arn"),
self.Topic, self.Region, self.FilterPolicy, function.resource_attributes
)
return [self._construct_permission(function, source_arn=self.Topic), subscription]

def _inject_subscription(self, function, topic, region, filterPolicy):
# SNS -> SQS -> Lambda
resources = []
queue = self._inject_sqs_queue()
queue_policy = self._inject_sqs_queue_policy(self.Topic, queue)
subscription = self._inject_subscription(
'sqs', queue.get_runtime_attr('arn'),
self.Topic, self.Region, self.FilterPolicy, function.resource_attributes
)

resources = resources + self._inject_sqs_event_source_mapping(function, role, queue.get_runtime_attr('arn'))
resources.append(queue)
resources.append(queue_policy)
resources.append(subscription)
return resources

def _inject_subscription(self, protocol, endpoint, topic, region, filterPolicy, resource_attributes):
subscription = SNSSubscription(self.logical_id)
subscription.Protocol = 'lambda'
subscription.Endpoint = function.get_runtime_attr("arn")
subscription.Protocol = protocol
subscription.Endpoint = endpoint
subscription.TopicArn = topic
if region is not None:
subscription.Region = region
if CONDITION in function.resource_attributes:
subscription.set_resource_attribute(CONDITION, function.resource_attributes[CONDITION])
if CONDITION in resource_attributes:
subscription.set_resource_attribute(CONDITION, resource_attributes[CONDITION])

if filterPolicy is not None:
subscription.FilterPolicy = filterPolicy

return subscription

def _inject_sqs_queue(self):
return SQSQueue(self.logical_id + 'Queue')

def _inject_sqs_event_source_mapping(self, function, role, queue_arn):
event_source = SQS(self.logical_id + 'EventSourceMapping')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct way to create LambdaEventSourceMapping resource?
Should I add LambdaEventSourceMapping and put linked_policy to IAM Role directory instead use SQS object?

I think the logic to create LambdaEventSourceMapping and linked_policy is completely same as SQS pull event. So I use SQS object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like that is the correct way to add this event source mapping 👍

event_source.Queue = queue_arn
event_source.BatchSize = 10
event_source.Enabled = True
return event_source.to_cloudformation(function=function, role=role)

def _inject_sqs_queue_policy(self, topic_arn, queue):
policy = SQSQueuePolicy(self.logical_id + 'QueuePolicy')
policy.PolicyDocument = SQSQueuePolicies.sns_topic_send_message_role_policy(
topic_arn, queue.get_runtime_attr('arn')
)
policy.Queues = [queue.get_runtime_attr('queue_url')]
return policy


class Api(PushEventSource):
"""Api method event source for SAM Functions."""
Expand Down
44 changes: 44 additions & 0 deletions samtranslator/model/sqs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from samtranslator.model import PropertyType, Resource
from samtranslator.model.types import is_type, list_of
from samtranslator.model.intrinsics import fnGetAtt, ref


class SQSQueue(Resource):
resource_type = 'AWS::SQS::Queue'
property_types = {
}
runtime_attrs = {
"queue_url": lambda self: ref(self.logical_id),
"arn": lambda self: fnGetAtt(self.logical_id, "Arn"),
}


class SQSQueuePolicy(Resource):
resource_type = 'AWS::SQS::QueuePolicy'
property_types = {
'PolicyDocument': PropertyType(True, is_type(dict)),
'Queues': PropertyType(True, list_of(str)),
}
runtime_attrs = {
"arn": lambda self: fnGetAtt(self.logical_id, "Arn")
}


class SQSQueuePolicies:
@classmethod
def sns_topic_send_message_role_policy(cls, topic_arn, queue_arn):
document = {
'Version': '2012-10-17',
'Statement': [{
'Action': 'sqs:SendMessage',
'Effect': 'Allow',
'Principal': '*',
'Resource': queue_arn,
'Condition': {
'ArnEquals': {
'aws:SourceArn': topic_arn
}
}
}]
}
return document
23 changes: 23 additions & 0 deletions tests/translator/input/sns_sqs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Resources:
SaveNotificationFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: s3://sam-demo-bucket/notifications.zip
Handler: index.save_notification
Runtime: nodejs8.10
Events:
NotificationTopic:
Type: SNS
Properties:
Topic: !Ref Notifications
SqsSubscription: true
FilterPolicy:
store:
- example_corp
price_usd:
- numeric:
- ">="
- 100

Notifications:
Type: AWS::SNS::Topic
Loading