Skip to content

Commit

Permalink
Fix DestinationConfig in streaming event sources (#2215)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawflau authored Nov 15, 2021
1 parent cc28051 commit 1174f0f
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 4 deletions.
8 changes: 4 additions & 4 deletions samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ def to_cloudformation(self, **kwargs):

destination_config_policy = None
if self.DestinationConfig:
if self.DestinationConfig.get("OnFailure") is None:
raise InvalidEventException(self.logical_id, "'OnFailure' is a required field for 'DestinationConfig'")

# `Type` property is for sam to attach the right policies
destination_type = self.DestinationConfig.get("OnFailure").get("Type")

Expand All @@ -120,10 +123,6 @@ def to_cloudformation(self, **kwargs):
# the values 'SQS' and 'SNS' are allowed. No intrinsics are allowed
if destination_type not in ["SQS", "SNS"]:
raise InvalidEventException(self.logical_id, "The only valid values for 'Type' are 'SQS' and 'SNS'")
if self.DestinationConfig.get("OnFailure") is None:
raise InvalidEventException(
self.logical_id, "'OnFailure' is a required field for " "'DestinationConfig'"
)
if destination_type == "SQS":
queue_arn = self.DestinationConfig.get("OnFailure").get("Destination")
destination_config_policy = IAMRolePolicies().sqs_send_message_role_policy(
Expand All @@ -134,6 +133,7 @@ def to_cloudformation(self, **kwargs):
destination_config_policy = IAMRolePolicies().sns_publish_role_policy(
sns_topic_arn, self.logical_id
)

lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig

if "role" in kwargs:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Parameters:
MyBatchingWindowParam:
Type: Number
Default: 45
Description: parameter for batching window in seconds

Resources:
MyFunction:
Type: AWS::Serverless::Function
Properties:
Handler: index.handler
InlineCode: |
exports.handler = async (event) => {
return {
statusCode: 200,
body: JSON.stringify(event),
headers: {}
}
}
Runtime: nodejs12.x
Policies:
- SQSSendMessagePolicy:
QueueName: !GetAtt MySqsQueue.QueueName
Events:
StreamEvent:
Type: Kinesis
Properties:
Stream: !GetAtt KinesisStream.Arn
MaximumBatchingWindowInSeconds: !Ref MyBatchingWindowParam
StartingPosition: LATEST
DestinationConfig:
OnFailure:
Type: INVALID_VALID
Destination: !Ref MySnsTopic

KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1

MySnsTopic:
Type: AWS::SNS::Topic
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Parameters:
MyBatchingWindowParam:
Type: Number
Default: 45
Description: parameter for batching window in seconds

Resources:
MyFunction:
Type: AWS::Serverless::Function
Properties:
Handler: index.handler
InlineCode: |
exports.handler = async (event) => {
return {
statusCode: 200,
body: JSON.stringify(event),
headers: {}
}
}
Runtime: nodejs12.x
Policies:
- SQSSendMessagePolicy:
QueueName: !GetAtt MySqsQueue.QueueName
Events:
StreamEvent:
Type: Kinesis
Properties:
Stream: !GetAtt KinesisStream.Arn
MaximumBatchingWindowInSeconds: !Ref MyBatchingWindowParam
StartingPosition: LATEST
DestinationConfig:
InvalidConfig:
Type: SNS
Destination: !Ref MySnsTopic

KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1

MySnsTopic:
Type: AWS::SNS::Topic
16 changes: 16 additions & 0 deletions tests/translator/input/function_with_event_source_mapping.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ Resources:
OnFailure:
Type: SQS
Destination: !GetAtt MySqsQueue.Arn
StreamEventWithoutDestinationConfigType:
Type: Kinesis
Properties:
Stream: !GetAtt KinesisStream1.Arn
MaximumBatchingWindowInSeconds: !Ref MyBatchingWindowParam
StartingPosition: LATEST
DestinationConfig:
OnFailure:
Destination: !Ref MySnsTopic
StreamEventWithEmptyDestinationConfig:
Type: Kinesis
Properties:
Stream: !GetAtt KinesisStream1.Arn
MaximumBatchingWindowInSeconds: !Ref MyBatchingWindowParam
StartingPosition: LATEST
DestinationConfig:

KinesisStream:
Type: AWS::Kinesis::Stream
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"errors": [
{
"errorMessage": "Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. The only valid values for 'Type' are 'SQS' and 'SNS'"
}
],
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. The only valid values for 'Type' are 'SQS' and 'SNS'"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"errors": [
{
"errorMessage": "Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. 'OnFailure' is a required field for 'DestinationConfig'"
}
],
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. 'OnFailure' is a required field for 'DestinationConfig'"
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,49 @@
}
}
},
"MyFunctionForBatchingExampleStreamEventWithoutDestinationConfigType": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"MaximumBatchingWindowInSeconds": {
"Ref": "MyBatchingWindowParam"
},
"EventSourceArn": {
"Fn::GetAtt": [
"KinesisStream1",
"Arn"
]
},
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "LATEST",
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Ref": "MySnsTopic"
}
}
}
}
},
"MyFunctionForBatchingExampleStreamEventWithEmptyDestinationConfig": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"MaximumBatchingWindowInSeconds": {
"Ref": "MyBatchingWindowParam"
},
"EventSourceArn": {
"Fn::GetAtt": [
"KinesisStream1",
"Arn"
]
},
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "LATEST"
}
},
"KinesisStream": {
"Type": "AWS::Kinesis::Stream",
"Properties": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"errors": [
{
"errorMessage": "Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. The only valid values for 'Type' are 'SQS' and 'SNS'"
}
],
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. The only valid values for 'Type' are 'SQS' and 'SNS'"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"errors": [
{
"errorMessage": "Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. 'OnFailure' is a required field for 'DestinationConfig'"
}
],
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. 'OnFailure' is a required field for 'DestinationConfig'"
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,49 @@
}
}
},
"MyFunctionForBatchingExampleStreamEventWithoutDestinationConfigType": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"MaximumBatchingWindowInSeconds": {
"Ref": "MyBatchingWindowParam"
},
"EventSourceArn": {
"Fn::GetAtt": [
"KinesisStream1",
"Arn"
]
},
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "LATEST",
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Ref": "MySnsTopic"
}
}
}
}
},
"MyFunctionForBatchingExampleStreamEventWithEmptyDestinationConfig": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"MaximumBatchingWindowInSeconds": {
"Ref": "MyBatchingWindowParam"
},
"EventSourceArn": {
"Fn::GetAtt": [
"KinesisStream1",
"Arn"
]
},
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "LATEST"
}
},
"KinesisStream": {
"Type": "AWS::Kinesis::Stream",
"Properties": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"errors": [
{
"errorMessage": "Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. The only valid values for 'Type' are 'SQS' and 'SNS'"
}
],
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. The only valid values for 'Type' are 'SQS' and 'SNS'"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"errors": [
{
"errorMessage": "Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. 'OnFailure' is a required field for 'DestinationConfig'"
}
],
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. 'OnFailure' is a required field for 'DestinationConfig'"
}
43 changes: 43 additions & 0 deletions tests/translator/output/function_with_event_source_mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,49 @@
}
}
},
"MyFunctionForBatchingExampleStreamEventWithoutDestinationConfigType": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"MaximumBatchingWindowInSeconds": {
"Ref": "MyBatchingWindowParam"
},
"EventSourceArn": {
"Fn::GetAtt": [
"KinesisStream1",
"Arn"
]
},
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "LATEST",
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Ref": "MySnsTopic"
}
}
}
}
},
"MyFunctionForBatchingExampleStreamEventWithEmptyDestinationConfig": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"MaximumBatchingWindowInSeconds": {
"Ref": "MyBatchingWindowParam"
},
"EventSourceArn": {
"Fn::GetAtt": [
"KinesisStream1",
"Arn"
]
},
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "LATEST"
}
},
"KinesisStream": {
"Type": "AWS::Kinesis::Stream",
"Properties": {
Expand Down

0 comments on commit 1174f0f

Please sign in to comment.