Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Move some PullEventSource validations to subclasses #2767

Merged
merged 2 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 80 additions & 39 deletions samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, List, Optional

from samtranslator.metrics.method_decorator import cw_timer
Expand All @@ -15,7 +16,7 @@
from samtranslator.validator.value_validator import sam_expect


class PullEventSource(ResourceMacro):
class PullEventSource(ResourceMacro, metaclass=ABCMeta):
"""Base class for pull event sources for SAM Functions.

The pull events are Kinesis Streams, DynamoDB Streams, Kafka Topics, Amazon MQ Queues and SQS Queues. All of these correspond to an
Expand All @@ -32,11 +33,8 @@ class PullEventSource(ResourceMacro):
# line to avoid any potential behavior change.
# TODO: Make `PullEventSource` an abstract class and not giving `resource_type` initial value.
resource_type: str = None # type: ignore
requires_stream_queue_broker = True
relative_id: str # overriding the Optional[str]: for event, relative id is not None
property_types = {
"Stream": PropertyType(False, IS_STR),
"Queue": PropertyType(False, IS_STR),
property_types: Dict[str, PropertyType] = {
"BatchSize": PropertyType(False, is_type(int)),
"StartingPosition": PassThroughProperty(False),
"StartingPositionTimestamp": PassThroughProperty(False),
Expand All @@ -48,7 +46,6 @@ class PullEventSource(ResourceMacro):
"DestinationConfig": PropertyType(False, IS_DICT),
"ParallelizationFactor": PropertyType(False, is_type(int)),
"Topics": PropertyType(False, is_type(list)),
"Broker": PropertyType(False, IS_STR),
"Queues": PropertyType(False, is_type(list)),
"SourceAccessConfigurations": PropertyType(False, is_type(list)),
"SecretsManagerKmsKeyId": PropertyType(False, IS_STR),
Expand All @@ -59,8 +56,6 @@ class PullEventSource(ResourceMacro):
"ConsumerGroupId": PropertyType(False, IS_STR),
}

Stream: Optional[Intrinsicable[str]]
Queue: Optional[Intrinsicable[str]]
BatchSize: Optional[Intrinsicable[int]]
StartingPosition: Optional[PassThrough]
StartingPositionTimestamp: Optional[PassThrough]
Expand All @@ -72,7 +67,6 @@ class PullEventSource(ResourceMacro):
DestinationConfig: Optional[Dict[str, Any]]
ParallelizationFactor: Optional[Intrinsicable[int]]
Topics: Optional[List[Any]]
Broker: Optional[Intrinsicable[str]]
Queues: Optional[List[Any]]
SourceAccessConfigurations: Optional[List[Any]]
SecretsManagerKmsKeyId: Optional[str]
Expand All @@ -82,11 +76,17 @@ class PullEventSource(ResourceMacro):
FilterCriteria: Optional[Dict[str, Any]]
ConsumerGroupId: Optional[Intrinsicable[str]]

def get_policy_arn(self): # type: ignore[no-untyped-def]
raise NotImplementedError("Subclass must implement this method")
@abstractmethod
def get_policy_arn(self) -> Optional[str]:
"""Policy to be added to the role (if a role applies)."""

def get_policy_statements(self): # type: ignore[no-untyped-def]
raise NotImplementedError("Subclass must implement this method")
@abstractmethod
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
"""Inline policy statements to be added to the role (if a role applies)."""

@abstractmethod
def get_event_source_arn(self) -> Optional[PassThrough]:
"""Return the value to assign to lambda event source mapping's EventSourceArn."""

@cw_timer(prefix=FUNCTION_EVETSOURCE_METRIC_PREFIX)
def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
Expand Down Expand Up @@ -115,17 +115,8 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
except NotImplementedError:
function_name_or_arn = function.get_runtime_attr("arn")

if self.requires_stream_queue_broker and not self.Stream and not self.Queue and not self.Broker:
raise InvalidEventException(
self.relative_id,
"No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.",
)

if self.Stream and not self.StartingPosition:
raise InvalidEventException(self.relative_id, "StartingPosition is required for Kinesis, DynamoDB and MSK.")

lambda_eventsourcemapping.FunctionName = function_name_or_arn
lambda_eventsourcemapping.EventSourceArn = self.Stream or self.Queue or self.Broker
lambda_eventsourcemapping.EventSourceArn = self.get_event_source_arn()
lambda_eventsourcemapping.StartingPosition = self.StartingPosition
lambda_eventsourcemapping.StartingPositionTimestamp = self.StartingPositionTimestamp
lambda_eventsourcemapping.BatchSize = self.BatchSize
Expand Down Expand Up @@ -202,8 +193,8 @@ def _link_policy(self, role, destination_config_policy=None): # type: ignore[no

:param model.iam.IAMRole role: the execution role generated for the function
"""
policy_arn = self.get_policy_arn() # type: ignore[no-untyped-call]
policy_statements = self.get_policy_statements() # type: ignore[no-untyped-call]
policy_arn = self.get_policy_arn()
policy_statements = self.get_policy_statements()
if role is not None:
if policy_arn is not None and policy_arn not in role.ManagedPolicyArns:
role.ManagedPolicyArns.append(policy_arn)
Expand Down Expand Up @@ -250,47 +241,86 @@ class Kinesis(PullEventSource):
"""Kinesis event source."""

resource_type = "Kinesis"
property_types: Dict[str, PropertyType] = {
**PullEventSource.property_types,
"Stream": PassThroughProperty(True),
"StartingPosition": PassThroughProperty(True),
}

Stream: PassThrough

def get_event_source_arn(self) -> Optional[PassThrough]:
return self.Stream

def get_policy_arn(self): # type: ignore[no-untyped-def]
def get_policy_arn(self) -> Optional[str]:
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaKinesisExecutionRole")

def get_policy_statements(self): # type: ignore[no-untyped-def]
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
return None


class DynamoDB(PullEventSource):
"""DynamoDB Streams event source."""

resource_type = "DynamoDB"
property_types: Dict[str, PropertyType] = {
**PullEventSource.property_types,
"Stream": PassThroughProperty(True),
"StartingPosition": PassThroughProperty(True),
}

Stream: PassThrough

def get_event_source_arn(self) -> Optional[PassThrough]:
return self.Stream

def get_policy_arn(self): # type: ignore[no-untyped-def]
def get_policy_arn(self) -> Optional[str]:
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaDynamoDBExecutionRole")

def get_policy_statements(self): # type: ignore[no-untyped-def]
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
return None


class SQS(PullEventSource):
"""SQS Queue event source."""

resource_type = "SQS"
property_types: Dict[str, PropertyType] = {
**PullEventSource.property_types,
"Queue": PassThroughProperty(True),
}

Queue: PassThrough

def get_event_source_arn(self) -> Optional[PassThrough]:
return self.Queue

def get_policy_arn(self): # type: ignore[no-untyped-def]
def get_policy_arn(self) -> Optional[str]:
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaSQSQueueExecutionRole")

def get_policy_statements(self): # type: ignore[no-untyped-def]
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
return None


class MSK(PullEventSource):
"""MSK event source."""

resource_type = "MSK"
property_types: Dict[str, PropertyType] = {
**PullEventSource.property_types,
"Stream": PassThroughProperty(True),
"StartingPosition": PassThroughProperty(True),
}

Stream: PassThrough

def get_event_source_arn(self) -> Optional[PassThrough]:
return self.Stream

def get_policy_arn(self): # type: ignore[no-untyped-def]
def get_policy_arn(self) -> Optional[str]:
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaMSKExecutionRole")

def get_policy_statements(self): # type: ignore[no-untyped-def]
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
if self.SourceAccessConfigurations:
for conf in self.SourceAccessConfigurations:
# Lambda does not support multiple CLIENT_CERTIFICATE_TLS_AUTH configurations
Expand All @@ -312,18 +342,27 @@ def get_policy_statements(self): # type: ignore[no-untyped-def]
}
]

return None
return None


class MQ(PullEventSource):
"""MQ event source."""

resource_type = "MQ"
property_types: Dict[str, PropertyType] = {
**PullEventSource.property_types,
"Broker": PassThroughProperty(True),
}

Broker: PassThrough

def get_policy_arn(self): # type: ignore[no-untyped-def]
def get_event_source_arn(self) -> Optional[PassThrough]:
return self.Broker

def get_policy_arn(self) -> Optional[str]:
return None

def get_policy_statements(self): # type: ignore[no-untyped-def]
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
if not self.SourceAccessConfigurations:
raise InvalidEventException(
self.relative_id,
Expand Down Expand Up @@ -404,7 +443,6 @@ class SelfManagedKafka(PullEventSource):
"""

resource_type = "SelfManagedKafka"
requires_stream_queue_broker = False
AUTH_MECHANISM = [
"SASL_SCRAM_256_AUTH",
"SASL_SCRAM_512_AUTH",
Expand All @@ -413,10 +451,13 @@ class SelfManagedKafka(PullEventSource):
"SERVER_ROOT_CA_CERTIFICATE",
]

def get_policy_arn(self): # type: ignore[no-untyped-def]
def get_event_source_arn(self) -> Optional[PassThrough]:
return None

def get_policy_arn(self) -> Optional[str]:
return None

def get_policy_statements(self): # type: ignore[no-untyped-def]
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
if not self.KafkaBootstrapServers:
raise InvalidEventException(
self.relative_id,
Expand Down
7 changes: 1 addition & 6 deletions tests/translator/output/error_missing_broker.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MQFunction] is invalid. Event with id [MyMQQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.",
"errors": [
{
"errorMessage": "Resource with id [MQFunction] is invalid. Event with id [MyMQQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided."
}
]
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MQFunctionMyMQQueue] is invalid. Missing required property 'Broker'."
}
7 changes: 1 addition & 6 deletions tests/translator/output/error_missing_queue.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [SQSFunction] is invalid. Event with id [MySqsQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.",
"errors": [
{
"errorMessage": "Resource with id [SQSFunction] is invalid. Event with id [MySqsQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided."
}
]
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [SQSFunctionMySqsQueue] is invalid. Missing required property 'Queue'."
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [KinesisFunction] is invalid. Event with id [MyKinesisStream] is invalid. StartingPosition is required for Kinesis, DynamoDB and MSK.",
"errors": [
{
"errorMessage": "Resource with id [KinesisFunction] is invalid. Event with id [MyKinesisStream] is invalid. StartingPosition is required for Kinesis, DynamoDB and MSK."
}
]
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [KinesisFunctionMyKinesisStream] is invalid. Missing required property 'StartingPosition'."
}
7 changes: 1 addition & 6 deletions tests/translator/output/error_missing_stream.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [DynamoDBFunction] is invalid. Event with id [MyDDBStream] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.",
"errors": [
{
"errorMessage": "Resource with id [DynamoDBFunction] is invalid. Event with id [MyDDBStream] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided."
}
]
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [DynamoDBFunctionMyDDBStream] is invalid. Missing required property 'Stream'."
}